client.rs

  1use crate::{
  2    adapters::DebugAdapterBinary,
  3    transport::{IoKind, LogKind, TransportDelegate},
  4};
  5use anyhow::Result;
  6use dap_types::{
  7    messages::{Message, Response},
  8    requests::Request,
  9};
 10use futures::channel::oneshot;
 11use gpui::AsyncApp;
 12use std::{
 13    hash::Hash,
 14    sync::atomic::{AtomicU64, Ordering},
 15};
 16
 17#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 18#[repr(transparent)]
 19pub struct SessionId(pub u32);
 20
 21impl SessionId {
 22    pub fn from_proto(client_id: u64) -> Self {
 23        Self(client_id as u32)
 24    }
 25
 26    pub fn to_proto(self) -> u64 {
 27        self.0 as u64
 28    }
 29}
 30
 31/// Represents a connection to the debug adapter process, either via stdout/stdin or a socket.
 32pub struct DebugAdapterClient {
 33    id: SessionId,
 34    sequence_count: AtomicU64,
 35    binary: DebugAdapterBinary,
 36    transport_delegate: TransportDelegate,
 37}
 38
 39pub type DapMessageHandler = Box<dyn FnMut(Message) + 'static + Send + Sync>;
 40
 41impl DebugAdapterClient {
 42    pub async fn start(
 43        id: SessionId,
 44        binary: DebugAdapterBinary,
 45        message_handler: DapMessageHandler,
 46        cx: &mut AsyncApp,
 47    ) -> Result<Self> {
 48        let transport_delegate = TransportDelegate::start(&binary, cx).await?;
 49        let this = Self {
 50            id,
 51            binary,
 52            transport_delegate,
 53            sequence_count: AtomicU64::new(1),
 54        };
 55        this.connect(message_handler, cx).await?;
 56
 57        Ok(this)
 58    }
 59
 60    pub fn should_reconnect_for_ssh(&self) -> bool {
 61        self.transport_delegate.tcp_arguments().is_some()
 62            && (self.binary.command.as_deref() == Some("ssh")
 63                || (cfg!(feature = "test-support")
 64                    && self.binary.command.as_deref() == Some("mock")))
 65    }
 66
 67    pub async fn connect(
 68        &self,
 69        message_handler: DapMessageHandler,
 70        cx: &mut AsyncApp,
 71    ) -> Result<()> {
 72        self.transport_delegate.connect(message_handler, cx).await
 73    }
 74
 75    pub async fn create_child_connection(
 76        &self,
 77        session_id: SessionId,
 78        binary: DebugAdapterBinary,
 79        message_handler: DapMessageHandler,
 80        cx: &mut AsyncApp,
 81    ) -> Result<Self> {
 82        let binary = if let Some(connection) = self.transport_delegate.tcp_arguments() {
 83            DebugAdapterBinary {
 84                command: None,
 85                arguments: Default::default(),
 86                envs: Default::default(),
 87                cwd: Default::default(),
 88                connection: Some(connection),
 89                request_args: binary.request_args,
 90            }
 91        } else {
 92            self.binary.clone()
 93        };
 94
 95        Self::start(session_id, binary, message_handler, cx).await
 96    }
 97
 98    /// Send a request to an adapter and get a response back
 99    /// Note: This function will block until a response is sent back from the adapter
100    pub async fn request<R: Request>(&self, arguments: R::Arguments) -> Result<R::Response> {
101        let serialized_arguments = serde_json::to_value(arguments)?;
102
103        let (callback_tx, callback_rx) = oneshot::channel::<Result<Response>>();
104
105        let sequence_id = self.next_sequence_id();
106
107        let request = crate::messages::Request {
108            seq: sequence_id,
109            command: R::COMMAND.to_string(),
110            arguments: Some(serialized_arguments),
111        };
112        self.transport_delegate
113            .pending_requests
114            .lock()
115            .insert(sequence_id, callback_tx)?;
116
117        log::debug!(
118            "Client {} send `{}` request with sequence_id: {}",
119            self.id.0,
120            R::COMMAND,
121            sequence_id
122        );
123        log::debug!("  request: {request:?}");
124
125        self.send_message(Message::Request(request)).await?;
126
127        let command = R::COMMAND.to_string();
128
129        let response = callback_rx.await??;
130        log::debug!(
131            "Client {} received response for: `{}` sequence_id: {}",
132            self.id.0,
133            command,
134            sequence_id
135        );
136        log::debug!("  response: {response:?}");
137
138        match response.success {
139            true => {
140                if let Some(json) = response.body {
141                    Ok(serde_json::from_value(json)?)
142                // Note: dap types configure themselves to return `None` when an empty object is received,
143                // which then fails here...
144                } else if let Ok(result) =
145                    serde_json::from_value(serde_json::Value::Object(Default::default()))
146                {
147                    Ok(result)
148                } else {
149                    Ok(serde_json::from_value(Default::default())?)
150                }
151            }
152            false => anyhow::bail!("Request failed: {}", response.message.unwrap_or_default()),
153        }
154    }
155
156    pub async fn send_message(&self, message: Message) -> Result<()> {
157        self.transport_delegate.send_message(message).await
158    }
159
160    pub fn id(&self) -> SessionId {
161        self.id
162    }
163
164    pub fn binary(&self) -> &DebugAdapterBinary {
165        &self.binary
166    }
167
168    /// Get the next sequence id to be used in a request
169    pub fn next_sequence_id(&self) -> u64 {
170        self.sequence_count.fetch_add(1, Ordering::Relaxed)
171    }
172
173    pub fn kill(&self) {
174        log::debug!("Killing DAP process");
175        self.transport_delegate.transport.lock().kill();
176        self.transport_delegate.pending_requests.lock().shutdown();
177    }
178
179    pub fn has_adapter_logs(&self) -> bool {
180        self.transport_delegate.has_adapter_logs()
181    }
182
183    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
184    where
185        F: 'static + Send + FnMut(IoKind, Option<&str>, &str),
186    {
187        self.transport_delegate.add_log_handler(f, kind);
188    }
189
190    #[cfg(any(test, feature = "test-support"))]
191    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
192    where
193        F: 'static
194            + Send
195            + FnMut(u64, R::Arguments) -> Result<R::Response, dap_types::ErrorResponse>,
196    {
197        use crate::transport::RequestHandling;
198
199        self.transport_delegate
200            .transport
201            .lock()
202            .as_fake()
203            .on_request::<R, _>(move |seq, request| {
204                RequestHandling::Respond(handler(seq, request))
205            });
206    }
207
208    #[cfg(any(test, feature = "test-support"))]
209    pub fn on_request_ext<R: dap_types::requests::Request, F>(&self, handler: F)
210    where
211        F: 'static
212            + Send
213            + FnMut(
214                u64,
215                R::Arguments,
216            ) -> crate::transport::RequestHandling<
217                Result<R::Response, dap_types::ErrorResponse>,
218            >,
219    {
220        self.transport_delegate
221            .transport
222            .lock()
223            .as_fake()
224            .on_request::<R, F>(handler);
225    }
226
227    #[cfg(any(test, feature = "test-support"))]
228    pub async fn fake_reverse_request<R: dap_types::requests::Request>(&self, args: R::Arguments) {
229        self.send_message(Message::Request(dap_types::messages::Request {
230            seq: self.sequence_count.load(Ordering::Relaxed),
231            command: R::COMMAND.into(),
232            arguments: serde_json::to_value(args).ok(),
233        }))
234        .await
235        .unwrap();
236    }
237
238    #[cfg(any(test, feature = "test-support"))]
239    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
240    where
241        F: 'static + Send + Fn(Response),
242    {
243        self.transport_delegate
244            .transport
245            .lock()
246            .as_fake()
247            .on_response::<R, F>(handler);
248    }
249
250    #[cfg(any(test, feature = "test-support"))]
251    pub async fn fake_event(&self, event: dap_types::messages::Events) {
252        self.send_message(Message::Event(Box::new(event)))
253            .await
254            .unwrap();
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::client::DebugAdapterClient;
262    use dap_types::{
263        Capabilities, InitializeRequestArguments, InitializeRequestArgumentsPathFormat,
264        RunInTerminalRequestArguments, StartDebuggingRequestArguments,
265        messages::Events,
266        requests::{Initialize, Request, RunInTerminal},
267    };
268    use gpui::TestAppContext;
269    use serde_json::json;
270    use settings::SettingsStore;
271    use std::sync::{
272        Arc,
273        atomic::{AtomicBool, Ordering},
274    };
275
276    pub fn init_test(cx: &mut gpui::TestAppContext) {
277        zlog::init_test();
278
279        cx.update(|cx| {
280            let settings = SettingsStore::test(cx);
281            cx.set_global(settings);
282        });
283    }
284
285    #[gpui::test]
286    pub async fn test_initialize_client(cx: &mut TestAppContext) {
287        #![expect(clippy::result_large_err)]
288        init_test(cx);
289
290        let client = DebugAdapterClient::start(
291            crate::client::SessionId(1),
292            DebugAdapterBinary {
293                command: Some("command".into()),
294                arguments: Default::default(),
295                envs: Default::default(),
296                connection: None,
297                cwd: None,
298                request_args: StartDebuggingRequestArguments {
299                    configuration: serde_json::Value::Null,
300                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
301                },
302            },
303            Box::new(|_| {}),
304            &mut cx.to_async(),
305        )
306        .await
307        .unwrap();
308
309        client.on_request::<Initialize, _>(move |_, _| {
310            Ok(dap_types::Capabilities {
311                supports_configuration_done_request: Some(true),
312                ..Default::default()
313            })
314        });
315
316        cx.run_until_parked();
317
318        let response = client
319            .request::<Initialize>(InitializeRequestArguments {
320                client_id: Some("zed".to_owned()),
321                client_name: Some("Zed".to_owned()),
322                adapter_id: "fake-adapter".to_owned(),
323                locale: Some("en-US".to_owned()),
324                path_format: Some(InitializeRequestArgumentsPathFormat::Path),
325                supports_variable_type: Some(true),
326                supports_variable_paging: Some(false),
327                supports_run_in_terminal_request: Some(true),
328                supports_memory_references: Some(true),
329                supports_progress_reporting: Some(false),
330                supports_invalidated_event: Some(false),
331                lines_start_at1: Some(true),
332                columns_start_at1: Some(true),
333                supports_memory_event: Some(false),
334                supports_args_can_be_interpreted_by_shell: Some(false),
335                supports_start_debugging_request: Some(true),
336                supports_ansistyling: Some(false),
337            })
338            .await
339            .unwrap();
340
341        cx.run_until_parked();
342
343        assert_eq!(
344            dap_types::Capabilities {
345                supports_configuration_done_request: Some(true),
346                ..Default::default()
347            },
348            response
349        );
350    }
351
352    #[gpui::test]
353    pub async fn test_calls_event_handler(cx: &mut TestAppContext) {
354        init_test(cx);
355
356        let called_event_handler = Arc::new(AtomicBool::new(false));
357
358        let client = DebugAdapterClient::start(
359            crate::client::SessionId(1),
360            DebugAdapterBinary {
361                command: Some("command".into()),
362                arguments: Default::default(),
363                envs: Default::default(),
364                connection: None,
365                cwd: None,
366                request_args: StartDebuggingRequestArguments {
367                    configuration: serde_json::Value::Null,
368                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
369                },
370            },
371            Box::new({
372                let called_event_handler = called_event_handler.clone();
373                move |event| {
374                    called_event_handler.store(true, Ordering::SeqCst);
375
376                    assert_eq!(
377                        Message::Event(Box::new(Events::Initialized(
378                            Some(Capabilities::default())
379                        ))),
380                        event
381                    );
382                }
383            }),
384            &mut cx.to_async(),
385        )
386        .await
387        .unwrap();
388
389        cx.run_until_parked();
390
391        client
392            .fake_event(Events::Initialized(Some(Capabilities::default())))
393            .await;
394
395        cx.run_until_parked();
396
397        assert!(
398            called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
399            "Event handler was not called"
400        );
401    }
402
403    #[gpui::test]
404    pub async fn test_calls_event_handler_for_reverse_request(cx: &mut TestAppContext) {
405        init_test(cx);
406
407        let called_event_handler = Arc::new(AtomicBool::new(false));
408
409        let client = DebugAdapterClient::start(
410            crate::client::SessionId(1),
411            DebugAdapterBinary {
412                command: Some("command".into()),
413                arguments: Default::default(),
414                envs: Default::default(),
415                connection: None,
416                cwd: None,
417                request_args: dap_types::StartDebuggingRequestArguments {
418                    configuration: serde_json::Value::Null,
419                    request: dap_types::StartDebuggingRequestArgumentsRequest::Launch,
420                },
421            },
422            Box::new({
423                let called_event_handler = called_event_handler.clone();
424                move |event| {
425                    called_event_handler.store(true, Ordering::SeqCst);
426
427                    assert_eq!(
428                        Message::Request(dap_types::messages::Request {
429                            seq: 1,
430                            command: RunInTerminal::COMMAND.into(),
431                            arguments: Some(json!({
432                                "cwd": "/project/path/src",
433                                "args": ["node", "test.js"],
434                            }))
435                        }),
436                        event
437                    );
438                }
439            }),
440            &mut cx.to_async(),
441        )
442        .await
443        .unwrap();
444
445        cx.run_until_parked();
446
447        client
448            .fake_reverse_request::<RunInTerminal>(RunInTerminalRequestArguments {
449                kind: None,
450                title: None,
451                cwd: "/project/path/src".into(),
452                args: vec!["node".into(), "test.js".into()],
453                env: None,
454                args_can_be_interpreted_by_shell: None,
455            })
456            .await;
457
458        cx.run_until_parked();
459
460        assert!(
461            called_event_handler.load(std::sync::atomic::Ordering::SeqCst),
462            "Event handler was not called"
463        );
464    }
465}