client.rs

  1use crate::{
  2    adapters::DebugAdapterBinary,
  3    transport::{IoKind, LogKind, TransportDelegate},
  4};
  5use anyhow::Result;
  6use dap_types::{
  7    messages::{Message, Response},
  8    requests::Request,
  9};
 10use futures::channel::oneshot;
 11use gpui::AsyncApp;
 12use std::{
 13    hash::Hash,
 14    sync::atomic::{AtomicU64, Ordering},
 15};
 16
 17#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 18#[repr(transparent)]
 19pub struct SessionId(pub u32);
 20
 21impl SessionId {
 22    pub const fn from_proto(client_id: u64) -> Self {
 23        Self(client_id as u32)
 24    }
 25
 26    pub const fn to_proto(self) -> u64 {
 27        self.0 as u64
 28    }
 29}
 30
 31/// Represents a connection to the debug adapter process, either via stdout/stdin or a socket.
 32pub struct DebugAdapterClient {
 33    id: SessionId,
 34    sequence_count: AtomicU64,
 35    binary: DebugAdapterBinary,
 36    transport_delegate: TransportDelegate,
 37}
 38
 39pub type DapMessageHandler = Box<dyn FnMut(Message) + 'static + Send + Sync>;
 40
 41impl DebugAdapterClient {
 42    pub async fn start(
 43        id: SessionId,
 44        binary: DebugAdapterBinary,
 45        message_handler: DapMessageHandler,
 46        cx: &mut AsyncApp,
 47    ) -> Result<Self> {
 48        let transport_delegate = TransportDelegate::start(&binary, cx).await?;
 49        let this = Self {
 50            id,
 51            binary,
 52            transport_delegate,
 53            sequence_count: AtomicU64::new(1),
 54        };
 55        this.connect(message_handler, cx).await?;
 56
 57        Ok(this)
 58    }
 59
 60    pub fn should_reconnect_for_ssh(&self) -> bool {
 61        self.transport_delegate.tcp_arguments().is_some()
 62            && self.binary.command.as_deref() == Some("ssh")
 63    }
 64
 65    pub async fn connect(
 66        &self,
 67        message_handler: DapMessageHandler,
 68        cx: &mut AsyncApp,
 69    ) -> Result<()> {
 70        self.transport_delegate.connect(message_handler, cx).await
 71    }
 72
 73    pub async fn create_child_connection(
 74        &self,
 75        session_id: SessionId,
 76        binary: DebugAdapterBinary,
 77        message_handler: DapMessageHandler,
 78        cx: &mut AsyncApp,
 79    ) -> Result<Self> {
 80        let binary = if let Some(connection) = self.transport_delegate.tcp_arguments() {
 81            DebugAdapterBinary {
 82                command: None,
 83                arguments: Default::default(),
 84                envs: Default::default(),
 85                cwd: Default::default(),
 86                connection: Some(connection),
 87                request_args: binary.request_args,
 88            }
 89        } else {
 90            self.binary.clone()
 91        };
 92
 93        Self::start(session_id, binary, message_handler, cx).await
 94    }
 95
 96    /// Send a request to an adapter and get a response back
 97    /// Note: This function will block until a response is sent back from the adapter
 98    pub async fn request<R: Request>(&self, arguments: R::Arguments) -> Result<R::Response> {
 99        let serialized_arguments = serde_json::to_value(arguments)?;
100
101        let (callback_tx, callback_rx) = oneshot::channel::<Result<Response>>();
102
103        let sequence_id = self.next_sequence_id();
104
105        let request = crate::messages::Request {
106            seq: sequence_id,
107            command: R::COMMAND.to_string(),
108            arguments: Some(serialized_arguments),
109        };
110        self.transport_delegate
111            .pending_requests
112            .lock()
113            .insert(sequence_id, callback_tx)?;
114
115        log::debug!(
116            "Client {} send `{}` request with sequence_id: {}",
117            self.id.0,
118            R::COMMAND,
119            sequence_id
120        );
121        log::debug!("  request: {request:?}");
122
123        self.send_message(Message::Request(request)).await?;
124
125        let command = R::COMMAND.to_string();
126
127        let response = callback_rx.await??;
128        log::debug!(
129            "Client {} received response for: `{}` sequence_id: {}",
130            self.id.0,
131            command,
132            sequence_id
133        );
134        log::debug!("  response: {response:?}");
135
136        match response.success {
137            true => {
138                if let Some(json) = response.body {
139                    Ok(serde_json::from_value(json)?)
140                // Note: dap types configure themselves to return `None` when an empty object is received,
141                // which then fails here...
142                } else if let Ok(result) =
143                    serde_json::from_value(serde_json::Value::Object(Default::default()))
144                {
145                    Ok(result)
146                } else {
147                    Ok(serde_json::from_value(Default::default())?)
148                }
149            }
150            false => anyhow::bail!("Request failed: {}", response.message.unwrap_or_default()),
151        }
152    }
153
154    pub async fn send_message(&self, message: Message) -> Result<()> {
155        self.transport_delegate.send_message(message).await
156    }
157
158    pub const fn id(&self) -> SessionId {
159        self.id
160    }
161
162    pub const fn binary(&self) -> &DebugAdapterBinary {
163        &self.binary
164    }
165
166    /// Get the next sequence id to be used in a request
167    pub fn next_sequence_id(&self) -> u64 {
168        self.sequence_count.fetch_add(1, Ordering::Relaxed)
169    }
170
171    pub fn kill(&self) {
172        log::debug!("Killing DAP process");
173        self.transport_delegate.transport.lock().kill();
174        self.transport_delegate.pending_requests.lock().shutdown();
175    }
176
177    pub fn has_adapter_logs(&self) -> bool {
178        self.transport_delegate.has_adapter_logs()
179    }
180
181    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
182    where
183        F: 'static + Send + FnMut(IoKind, Option<&str>, &str),
184    {
185        self.transport_delegate.add_log_handler(f, kind);
186    }
187
188    #[cfg(any(test, feature = "test-support"))]
189    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
190    where
191        F: 'static
192            + Send
193            + FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
194    {
195        use crate::transport::RequestHandling;
196
197        self.transport_delegate
198            .transport
199            .lock()
200            .as_fake()
201            .on_request::<R, _>(move |seq, request| {
202                RequestHandling::Respond(handler(seq, request))
203            });
204    }
205
206    #[cfg(any(test, feature = "test-support"))]
207    pub fn on_request_ext<R: dap_types::requests::Request, F>(&self, handler: F)
208    where
209        F: 'static
210            + Send
211            + FnMut(
212                u64,
213                R::Arguments,
214            ) -> crate::transport::RequestHandling<
215                Result<R::Response, dap_types::ErrorResponse>,
216            >,
217    {
218        self.transport_delegate
219            .transport
220            .lock()
221            .as_fake()
222            .on_request::<R, F>(handler);
223    }
224
225    #[cfg(any(test, feature = "test-support"))]
226    pub async fn fake_reverse_request<R: dap_types::requests::Request>(&self, args: R::Arguments) {
227        self.send_message(Message::Request(dap_types::messages::Request {
228            seq: self.sequence_count.load(Ordering::Relaxed),
229            command: R::COMMAND.into(),
230            arguments: serde_json::to_value(args).ok(),
231        }))
232        .await
233        .unwrap();
234    }
235
236    #[cfg(any(test, feature = "test-support"))]
237    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
238    where
239        F: 'static + Send + Fn(Response),
240    {
241        self.transport_delegate
242            .transport
243            .lock()
244            .as_fake()
245            .on_response::<R, F>(handler);
246    }
247
248    #[cfg(any(test, feature = "test-support"))]
249    pub async fn fake_event(&self, event: dap_types::messages::Events) {
250        self.send_message(Message::Event(Box::new(event)))
251            .await
252            .unwrap();
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::{client::DebugAdapterClient, debugger_settings::DebuggerSettings};
260    use dap_types::{
261        Capabilities, InitializeRequestArguments, InitializeRequestArgumentsPathFormat,
262        RunInTerminalRequestArguments, StartDebuggingRequestArguments,
263        messages::Events,
264        requests::{Initialize, Request, RunInTerminal},
265    };
266    use gpui::TestAppContext;
267    use serde_json::json;
268    use settings::{Settings, SettingsStore};
269    use std::sync::{
270        Arc,
271        atomic::{AtomicBool, Ordering},
272    };
273
274    pub fn init_test(cx: &mut gpui::TestAppContext) {
275        zlog::init_test();
276
277        cx.update(|cx| {
278            let settings = SettingsStore::test(cx);
279            cx.set_global(settings);
280            DebuggerSettings::register(cx);
281        });
282    }
283
284    #[gpui::test]
285    pub async fn test_initialize_client(cx: &mut TestAppContext) {
286        init_test(cx);
287
288        let client = DebugAdapterClient::start(
289            crate::client::SessionId(1),
290            DebugAdapterBinary {
291                command: Some("command".into()),
292                arguments: Default::default(),
293                envs: Default::default(),
294                connection: None,
295                cwd: None,
296                request_args: StartDebuggingRequestArguments {
297                    configuration: serde_json::Value::Null,
298                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
299                },
300            },
301            Box::new(|_| {}),
302            &mut cx.to_async(),
303        )
304        .await
305        .unwrap();
306
307        client.on_request::<Initialize, _>(move |_, _| {
308            Ok(dap_types::Capabilities {
309                supports_configuration_done_request: Some(true),
310                ..Default::default()
311            })
312        });
313
314        cx.run_until_parked();
315
316        let response = client
317            .request::<Initialize>(InitializeRequestArguments {
318                client_id: Some("zed".to_owned()),
319                client_name: Some("Zed".to_owned()),
320                adapter_id: "fake-adapter".to_owned(),
321                locale: Some("en-US".to_owned()),
322                path_format: Some(InitializeRequestArgumentsPathFormat::Path),
323                supports_variable_type: Some(true),
324                supports_variable_paging: Some(false),
325                supports_run_in_terminal_request: Some(true),
326                supports_memory_references: Some(true),
327                supports_progress_reporting: Some(false),
328                supports_invalidated_event: Some(false),
329                lines_start_at1: Some(true),
330                columns_start_at1: Some(true),
331                supports_memory_event: Some(false),
332                supports_args_can_be_interpreted_by_shell: Some(false),
333                supports_start_debugging_request: Some(true),
334                supports_ansistyling: Some(false),
335            })
336            .await
337            .unwrap();
338
339        cx.run_until_parked();
340
341        assert_eq!(
342            dap_types::Capabilities {
343                supports_configuration_done_request: Some(true),
344                ..Default::default()
345            },
346            response
347        );
348    }
349
350    #[gpui::test]
351    pub async fn test_calls_event_handler(cx: &mut TestAppContext) {
352        init_test(cx);
353
354        let called_event_handler = Arc::new(AtomicBool::new(false));
355
356        let client = DebugAdapterClient::start(
357            crate::client::SessionId(1),
358            DebugAdapterBinary {
359                command: Some("command".into()),
360                arguments: Default::default(),
361                envs: Default::default(),
362                connection: None,
363                cwd: None,
364                request_args: StartDebuggingRequestArguments {
365                    configuration: serde_json::Value::Null,
366                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
367                },
368            },
369            Box::new({
370                let called_event_handler = called_event_handler.clone();
371                move |event| {
372                    called_event_handler.store(true, Ordering::SeqCst);
373
374                    assert_eq!(
375                        Message::Event(Box::new(Events::Initialized(
376                            Some(Capabilities::default())
377                        ))),
378                        event
379                    );
380                }
381            }),
382            &mut cx.to_async(),
383        )
384        .await
385        .unwrap();
386
387        cx.run_until_parked();
388
389        client
390            .fake_event(Events::Initialized(Some(Capabilities::default())))
391            .await;
392
393        cx.run_until_parked();
394
395        assert!(
396            called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
397            "Event handler was not called"
398        );
399    }
400
401    #[gpui::test]
402    pub async fn test_calls_event_handler_for_reverse_request(cx: &mut TestAppContext) {
403        init_test(cx);
404
405        let called_event_handler = Arc::new(AtomicBool::new(false));
406
407        let client = DebugAdapterClient::start(
408            crate::client::SessionId(1),
409            DebugAdapterBinary {
410                command: Some("command".into()),
411                arguments: Default::default(),
412                envs: Default::default(),
413                connection: None,
414                cwd: None,
415                request_args: dap_types::StartDebuggingRequestArguments {
416                    configuration: serde_json::Value::Null,
417                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
418                },
419            },
420            Box::new({
421                let called_event_handler = called_event_handler.clone();
422                move |event| {
423                    called_event_handler.store(true, Ordering::SeqCst);
424
425                    assert_eq!(
426                        Message::Request(dap_types::messages::Request {
427                            seq: 1,
428                            command: RunInTerminal::COMMAND.into(),
429                            arguments: Some(json!({
430                                "cwd": "/project/path/src",
431                                "args": ["node", "test.js"],
432                            }))
433                        }),
434                        event
435                    );
436                }
437            }),
438            &mut cx.to_async(),
439        )
440        .await
441        .unwrap();
442
443        cx.run_until_parked();
444
445        client
446            .fake_reverse_request::<RunInTerminal>(RunInTerminalRequestArguments {
447                kind: None,
448                title: None,
449                cwd: "/project/path/src".into(),
450                args: vec!["node".into(), "test.js".into()],
451                env: None,
452                args_can_be_interpreted_by_shell: None,
453            })
454            .await;
455
456        cx.run_until_parked();
457
458        assert!(
459            called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
460            "Event handler was not called"
461        );
462    }
463}