lsp.rs

  1pub use lsp_types::request::*;
  2pub use lsp_types::*;
  3
  4use anyhow::{anyhow, Context, Result};
  5use collections::HashMap;
  6use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  7use gpui::{executor, AsyncAppContext, Task};
  8use parking_lot::Mutex;
  9use postage::{barrier, prelude::Stream};
 10use serde::{de::DeserializeOwned, Deserialize, Serialize};
 11use serde_json::{json, value::RawValue, Value};
 12use smol::{
 13    channel,
 14    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 15    process::{self, Child},
 16};
 17use std::{
 18    future::Future,
 19    io::Write,
 20    path::PathBuf,
 21    str::FromStr,
 22    sync::{
 23        atomic::{AtomicUsize, Ordering::SeqCst},
 24        Arc,
 25    },
 26};
 27use std::{path::Path, process::Stdio};
 28use util::{ResultExt, TryFutureExt};
 29
 30const JSON_RPC_VERSION: &str = "2.0";
 31const CONTENT_LEN_HEADER: &str = "Content-Length: ";
 32
 33type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 35
 36pub struct LanguageServer {
 37    server_id: usize,
 38    next_id: AtomicUsize,
 39    outbound_tx: channel::Sender<Vec<u8>>,
 40    name: String,
 41    capabilities: ServerCapabilities,
 42    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 43    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 44    executor: Arc<executor::Background>,
 45    #[allow(clippy::type_complexity)]
 46    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 47    output_done_rx: Mutex<Option<barrier::Receiver>>,
 48    root_path: PathBuf,
 49    _server: Option<Child>,
 50}
 51
 52pub struct Subscription {
 53    method: &'static str,
 54    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 55}
 56
 57#[derive(Serialize, Deserialize)]
 58struct Request<'a, T> {
 59    jsonrpc: &'a str,
 60    id: usize,
 61    method: &'a str,
 62    params: T,
 63}
 64
 65#[derive(Serialize, Deserialize)]
 66struct AnyResponse<'a> {
 67    id: usize,
 68    #[serde(default)]
 69    error: Option<Error>,
 70    #[serde(borrow)]
 71    result: Option<&'a RawValue>,
 72}
 73
 74#[derive(Serialize)]
 75struct Response<T> {
 76    id: usize,
 77    result: Option<T>,
 78    error: Option<Error>,
 79}
 80
 81#[derive(Serialize, Deserialize)]
 82struct Notification<'a, T> {
 83    #[serde(borrow)]
 84    jsonrpc: &'a str,
 85    #[serde(borrow)]
 86    method: &'a str,
 87    params: T,
 88}
 89
 90#[derive(Deserialize)]
 91struct AnyNotification<'a> {
 92    #[serde(default)]
 93    id: Option<usize>,
 94    #[serde(borrow)]
 95    method: &'a str,
 96    #[serde(borrow)]
 97    params: &'a RawValue,
 98}
 99
100#[derive(Debug, Serialize, Deserialize)]
101struct Error {
102    message: String,
103}
104
105impl LanguageServer {
106    pub fn new<T: AsRef<std::ffi::OsStr>>(
107        server_id: usize,
108        binary_path: &Path,
109        args: &[T],
110        root_path: &Path,
111        cx: AsyncAppContext,
112    ) -> Result<Self> {
113        let working_dir = if root_path.is_dir() {
114            root_path
115        } else {
116            root_path.parent().unwrap_or_else(|| Path::new("/"))
117        };
118        let mut server = process::Command::new(binary_path)
119            .current_dir(working_dir)
120            .args(args)
121            .stdin(Stdio::piped())
122            .stdout(Stdio::piped())
123            .stderr(Stdio::inherit())
124            .kill_on_drop(true)
125            .spawn()?;
126
127        let stdin = server.stdin.take().unwrap();
128        let stout = server.stdout.take().unwrap();
129
130        let mut server = Self::new_internal(
131            server_id,
132            stdin,
133            stout,
134            Some(server),
135            root_path,
136            cx,
137            |notification| {
138                log::info!(
139                    "unhandled notification {}:\n{}",
140                    notification.method,
141                    serde_json::to_string_pretty(
142                        &Value::from_str(notification.params.get()).unwrap()
143                    )
144                    .unwrap()
145                );
146            },
147        );
148        if let Some(name) = binary_path.file_name() {
149            server.name = name.to_string_lossy().to_string();
150        }
151        Ok(server)
152    }
153
154    fn new_internal<Stdin, Stdout, F>(
155        server_id: usize,
156        stdin: Stdin,
157        stdout: Stdout,
158        server: Option<Child>,
159        root_path: &Path,
160        cx: AsyncAppContext,
161        mut on_unhandled_notification: F,
162    ) -> Self
163    where
164        Stdin: AsyncWrite + Unpin + Send + 'static,
165        Stdout: AsyncRead + Unpin + Send + 'static,
166        F: FnMut(AnyNotification) + 'static + Send,
167    {
168        let mut stdin = BufWriter::new(stdin);
169        let mut stdout = BufReader::new(stdout);
170        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
171        let notification_handlers =
172            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
173        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
174        let input_task = cx.spawn(|cx| {
175            let notification_handlers = notification_handlers.clone();
176            let response_handlers = response_handlers.clone();
177            async move {
178                let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
179                let mut buffer = Vec::new();
180                loop {
181                    buffer.clear();
182                    stdout.read_until(b'\n', &mut buffer).await?;
183                    stdout.read_until(b'\n', &mut buffer).await?;
184                    let message_len: usize = std::str::from_utf8(&buffer)?
185                        .strip_prefix(CONTENT_LEN_HEADER)
186                        .ok_or_else(|| anyhow!("invalid header"))?
187                        .trim_end()
188                        .parse()?;
189
190                    buffer.resize(message_len, 0);
191                    stdout.read_exact(&mut buffer).await?;
192                    log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
193
194                    if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
195                        if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
196                            handler(msg.id, msg.params.get(), cx.clone());
197                        } else {
198                            on_unhandled_notification(msg);
199                        }
200                    } else if let Ok(AnyResponse { id, error, result }) =
201                        serde_json::from_slice(&buffer)
202                    {
203                        if let Some(handler) = response_handlers.lock().remove(&id) {
204                            if let Some(error) = error {
205                                handler(Err(error));
206                            } else if let Some(result) = result {
207                                handler(Ok(result.get()));
208                            } else {
209                                handler(Ok("null"));
210                            }
211                        }
212                    } else {
213                        return Err(anyhow!(
214                            "failed to deserialize message:\n{}",
215                            std::str::from_utf8(&buffer)?
216                        ));
217                    }
218
219                    // Don't starve the main thread when receiving lots of messages at once.
220                    smol::future::yield_now().await;
221                }
222            }
223            .log_err()
224        });
225        let (output_done_tx, output_done_rx) = barrier::channel();
226        let output_task = cx.background().spawn({
227            let response_handlers = response_handlers.clone();
228            async move {
229                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
230                let mut content_len_buffer = Vec::new();
231                while let Ok(message) = outbound_rx.recv().await {
232                    log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
233                    content_len_buffer.clear();
234                    write!(content_len_buffer, "{}", message.len()).unwrap();
235                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
236                    stdin.write_all(&content_len_buffer).await?;
237                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
238                    stdin.write_all(&message).await?;
239                    stdin.flush().await?;
240                }
241                drop(output_done_tx);
242                Ok(())
243            }
244            .log_err()
245        });
246
247        Self {
248            server_id,
249            notification_handlers,
250            response_handlers,
251            name: Default::default(),
252            capabilities: Default::default(),
253            next_id: Default::default(),
254            outbound_tx,
255            executor: cx.background(),
256            io_tasks: Mutex::new(Some((input_task, output_task))),
257            output_done_rx: Mutex::new(Some(output_done_rx)),
258            root_path: root_path.to_path_buf(),
259            _server: server,
260        }
261    }
262
263    /// Initializes a language server.
264    /// Note that `options` is used directly to construct [`InitializeParams`],
265    /// which is why it is owned.
266    pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
267        let root_uri = Url::from_file_path(&self.root_path).unwrap();
268        #[allow(deprecated)]
269        let params = InitializeParams {
270            process_id: Default::default(),
271            root_path: Default::default(),
272            root_uri: Some(root_uri.clone()),
273            initialization_options: options,
274            capabilities: ClientCapabilities {
275                workspace: Some(WorkspaceClientCapabilities {
276                    configuration: Some(true),
277                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
278                        dynamic_registration: Some(true),
279                    }),
280                    ..Default::default()
281                }),
282                text_document: Some(TextDocumentClientCapabilities {
283                    definition: Some(GotoCapability {
284                        link_support: Some(true),
285                        ..Default::default()
286                    }),
287                    code_action: Some(CodeActionClientCapabilities {
288                        code_action_literal_support: Some(CodeActionLiteralSupport {
289                            code_action_kind: CodeActionKindLiteralSupport {
290                                value_set: vec![
291                                    CodeActionKind::REFACTOR.as_str().into(),
292                                    CodeActionKind::QUICKFIX.as_str().into(),
293                                    CodeActionKind::SOURCE.as_str().into(),
294                                ],
295                            },
296                        }),
297                        data_support: Some(true),
298                        resolve_support: Some(CodeActionCapabilityResolveSupport {
299                            properties: vec!["edit".to_string(), "command".to_string()],
300                        }),
301                        ..Default::default()
302                    }),
303                    completion: Some(CompletionClientCapabilities {
304                        completion_item: Some(CompletionItemCapability {
305                            snippet_support: Some(true),
306                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
307                                properties: vec!["additionalTextEdits".to_string()],
308                            }),
309                            ..Default::default()
310                        }),
311                        ..Default::default()
312                    }),
313                    rename: Some(RenameClientCapabilities {
314                        prepare_support: Some(true),
315                        ..Default::default()
316                    }),
317                    hover: Some(HoverClientCapabilities {
318                        content_format: Some(vec![MarkupKind::Markdown]),
319                        ..Default::default()
320                    }),
321                    ..Default::default()
322                }),
323                experimental: Some(json!({
324                    "serverStatusNotification": true,
325                })),
326                window: Some(WindowClientCapabilities {
327                    work_done_progress: Some(true),
328                    ..Default::default()
329                }),
330                ..Default::default()
331            },
332            trace: Default::default(),
333            workspace_folders: Some(vec![WorkspaceFolder {
334                uri: root_uri,
335                name: Default::default(),
336            }]),
337            client_info: Default::default(),
338            locale: Default::default(),
339        };
340
341        let response = self.request::<request::Initialize>(params).await?;
342        if let Some(info) = response.server_info {
343            self.name = info.name;
344        }
345        self.capabilities = response.capabilities;
346
347        self.notify::<notification::Initialized>(InitializedParams {})?;
348        Ok(Arc::new(self))
349    }
350
351    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
352        if let Some(tasks) = self.io_tasks.lock().take() {
353            let response_handlers = self.response_handlers.clone();
354            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
355            let outbound_tx = self.outbound_tx.clone();
356            let mut output_done = self.output_done_rx.lock().take().unwrap();
357            let shutdown_request = Self::request_internal::<request::Shutdown>(
358                &next_id,
359                &response_handlers,
360                &outbound_tx,
361                (),
362            );
363            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
364            outbound_tx.close();
365            Some(
366                async move {
367                    log::debug!("language server shutdown started");
368                    shutdown_request.await?;
369                    response_handlers.lock().clear();
370                    exit?;
371                    output_done.recv().await;
372                    log::debug!("language server shutdown finished");
373                    drop(tasks);
374                    Ok(())
375                }
376                .log_err(),
377            )
378        } else {
379            None
380        }
381    }
382
383    #[must_use]
384    pub fn on_notification<T, F>(&self, f: F) -> Subscription
385    where
386        T: notification::Notification,
387        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
388    {
389        self.on_custom_notification(T::METHOD, f)
390    }
391
392    #[must_use]
393    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
394    where
395        T: request::Request,
396        T::Params: 'static + Send,
397        F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
398        Fut: 'static + Future<Output = Result<T::Result>>,
399    {
400        self.on_custom_request(T::METHOD, f)
401    }
402
403    pub fn remove_request_handler<T: request::Request>(&self) {
404        self.notification_handlers.lock().remove(T::METHOD);
405    }
406
407    #[must_use]
408    pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
409    where
410        F: 'static + Send + FnMut(Params, AsyncAppContext),
411        Params: DeserializeOwned,
412    {
413        let prev_handler = self.notification_handlers.lock().insert(
414            method,
415            Box::new(move |_, params, cx| {
416                if let Some(params) = serde_json::from_str(params).log_err() {
417                    f(params, cx);
418                }
419            }),
420        );
421        assert!(
422            prev_handler.is_none(),
423            "registered multiple handlers for the same LSP method"
424        );
425        Subscription {
426            method,
427            notification_handlers: self.notification_handlers.clone(),
428        }
429    }
430
431    #[must_use]
432    pub fn on_custom_request<Params, Res, Fut, F>(
433        &self,
434        method: &'static str,
435        mut f: F,
436    ) -> Subscription
437    where
438        F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
439        Fut: 'static + Future<Output = Result<Res>>,
440        Params: DeserializeOwned + Send + 'static,
441        Res: Serialize,
442    {
443        let outbound_tx = self.outbound_tx.clone();
444        let prev_handler = self.notification_handlers.lock().insert(
445            method,
446            Box::new(move |id, params, cx| {
447                if let Some(id) = id {
448                    if let Some(params) = serde_json::from_str(params).log_err() {
449                        let response = f(params, cx.clone());
450                        cx.foreground()
451                            .spawn({
452                                let outbound_tx = outbound_tx.clone();
453                                async move {
454                                    let response = match response.await {
455                                        Ok(result) => Response {
456                                            id,
457                                            result: Some(result),
458                                            error: None,
459                                        },
460                                        Err(error) => Response {
461                                            id,
462                                            result: None,
463                                            error: Some(Error {
464                                                message: error.to_string(),
465                                            }),
466                                        },
467                                    };
468                                    if let Some(response) = serde_json::to_vec(&response).log_err()
469                                    {
470                                        outbound_tx.try_send(response).ok();
471                                    }
472                                }
473                            })
474                            .detach();
475                    }
476                }
477            }),
478        );
479        assert!(
480            prev_handler.is_none(),
481            "registered multiple handlers for the same LSP method"
482        );
483        Subscription {
484            method,
485            notification_handlers: self.notification_handlers.clone(),
486        }
487    }
488
489    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
490        &self.name
491    }
492
493    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
494        &self.capabilities
495    }
496
497    pub fn server_id(&self) -> usize {
498        self.server_id
499    }
500
501    pub fn root_path(&self) -> &PathBuf {
502        &self.root_path
503    }
504
505    pub fn request<T: request::Request>(
506        &self,
507        params: T::Params,
508    ) -> impl Future<Output = Result<T::Result>>
509    where
510        T::Result: 'static + Send,
511    {
512        Self::request_internal::<T>(
513            &self.next_id,
514            &self.response_handlers,
515            &self.outbound_tx,
516            params,
517        )
518    }
519
520    fn request_internal<T: request::Request>(
521        next_id: &AtomicUsize,
522        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
523        outbound_tx: &channel::Sender<Vec<u8>>,
524        params: T::Params,
525    ) -> impl 'static + Future<Output = Result<T::Result>>
526    where
527        T::Result: 'static + Send,
528    {
529        let id = next_id.fetch_add(1, SeqCst);
530        let message = serde_json::to_vec(&Request {
531            jsonrpc: JSON_RPC_VERSION,
532            id,
533            method: T::METHOD,
534            params,
535        })
536        .unwrap();
537
538        let send = outbound_tx
539            .try_send(message)
540            .context("failed to write to language server's stdin");
541
542        let (tx, rx) = oneshot::channel();
543        response_handlers.lock().insert(
544            id,
545            Box::new(move |result| {
546                let response = match result {
547                    Ok(response) => {
548                        serde_json::from_str(response).context("failed to deserialize response")
549                    }
550                    Err(error) => Err(anyhow!("{}", error.message)),
551                };
552                let _ = tx.send(response);
553            }),
554        );
555
556        async move {
557            send?;
558            rx.await?
559        }
560    }
561
562    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
563        Self::notify_internal::<T>(&self.outbound_tx, params)
564    }
565
566    fn notify_internal<T: notification::Notification>(
567        outbound_tx: &channel::Sender<Vec<u8>>,
568        params: T::Params,
569    ) -> Result<()> {
570        let message = serde_json::to_vec(&Notification {
571            jsonrpc: JSON_RPC_VERSION,
572            method: T::METHOD,
573            params,
574        })
575        .unwrap();
576        outbound_tx.try_send(message)?;
577        Ok(())
578    }
579}
580
581impl Drop for LanguageServer {
582    fn drop(&mut self) {
583        if let Some(shutdown) = self.shutdown() {
584            self.executor.spawn(shutdown).detach();
585        }
586    }
587}
588
589impl Subscription {
590    pub fn detach(mut self) {
591        self.method = "";
592    }
593}
594
595impl Drop for Subscription {
596    fn drop(&mut self) {
597        self.notification_handlers.lock().remove(self.method);
598    }
599}
600
601#[cfg(any(test, feature = "test-support"))]
602#[derive(Clone)]
603pub struct FakeLanguageServer {
604    pub server: Arc<LanguageServer>,
605    notifications_rx: channel::Receiver<(String, String)>,
606}
607
608#[cfg(any(test, feature = "test-support"))]
609impl LanguageServer {
610    pub fn full_capabilities() -> ServerCapabilities {
611        ServerCapabilities {
612            document_highlight_provider: Some(OneOf::Left(true)),
613            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
614            document_formatting_provider: Some(OneOf::Left(true)),
615            document_range_formatting_provider: Some(OneOf::Left(true)),
616            ..Default::default()
617        }
618    }
619
620    pub fn fake(
621        name: String,
622        capabilities: ServerCapabilities,
623        cx: AsyncAppContext,
624    ) -> (Self, FakeLanguageServer) {
625        let (stdin_writer, stdin_reader) = async_pipe::pipe();
626        let (stdout_writer, stdout_reader) = async_pipe::pipe();
627        let (notifications_tx, notifications_rx) = channel::unbounded();
628
629        let server = Self::new_internal(
630            0,
631            stdin_writer,
632            stdout_reader,
633            None,
634            Path::new("/"),
635            cx.clone(),
636            |_| {},
637        );
638        let fake = FakeLanguageServer {
639            server: Arc::new(Self::new_internal(
640                0,
641                stdout_writer,
642                stdin_reader,
643                None,
644                Path::new("/"),
645                cx,
646                move |msg| {
647                    notifications_tx
648                        .try_send((msg.method.to_string(), msg.params.get().to_string()))
649                        .ok();
650                },
651            )),
652            notifications_rx,
653        };
654        fake.handle_request::<request::Initialize, _, _>({
655            let capabilities = capabilities;
656            move |_, _| {
657                let capabilities = capabilities.clone();
658                let name = name.clone();
659                async move {
660                    Ok(InitializeResult {
661                        capabilities,
662                        server_info: Some(ServerInfo {
663                            name,
664                            ..Default::default()
665                        }),
666                    })
667                }
668            }
669        });
670
671        (server, fake)
672    }
673}
674
675#[cfg(any(test, feature = "test-support"))]
676impl FakeLanguageServer {
677    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
678        self.server.notify::<T>(params).ok();
679    }
680
681    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
682    where
683        T: request::Request,
684        T::Result: 'static + Send,
685    {
686        self.server.request::<T>(params).await
687    }
688
689    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
690        self.try_receive_notification::<T>().await.unwrap()
691    }
692
693    pub async fn try_receive_notification<T: notification::Notification>(
694        &mut self,
695    ) -> Option<T::Params> {
696        use futures::StreamExt as _;
697
698        loop {
699            let (method, params) = self.notifications_rx.next().await?;
700            if method == T::METHOD {
701                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
702            } else {
703                log::info!("skipping message in fake language server {:?}", params);
704            }
705        }
706    }
707
708    pub fn handle_request<T, F, Fut>(
709        &self,
710        mut handler: F,
711    ) -> futures::channel::mpsc::UnboundedReceiver<()>
712    where
713        T: 'static + request::Request,
714        T::Params: 'static + Send,
715        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
716        Fut: 'static + Send + Future<Output = Result<T::Result>>,
717    {
718        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
719        self.server.remove_request_handler::<T>();
720        self.server
721            .on_request::<T, _, _>(move |params, cx| {
722                let result = handler(params, cx.clone());
723                let responded_tx = responded_tx.clone();
724                async move {
725                    cx.background().simulate_random_delay().await;
726                    let result = result.await;
727                    responded_tx.unbounded_send(()).ok();
728                    result
729                }
730            })
731            .detach();
732        responded_rx
733    }
734
735    pub fn remove_request_handler<T>(&mut self)
736    where
737        T: 'static + request::Request,
738    {
739        self.server.remove_request_handler::<T>();
740    }
741
742    pub async fn start_progress(&self, token: impl Into<String>) {
743        let token = token.into();
744        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
745            token: NumberOrString::String(token.clone()),
746        })
747        .await
748        .unwrap();
749        self.notify::<notification::Progress>(ProgressParams {
750            token: NumberOrString::String(token),
751            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
752        });
753    }
754
755    pub fn end_progress(&self, token: impl Into<String>) {
756        self.notify::<notification::Progress>(ProgressParams {
757            token: NumberOrString::String(token.into()),
758            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
759        });
760    }
761}
762
763struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
764
765impl Drop for ClearResponseHandlers {
766    fn drop(&mut self) {
767        self.0.lock().clear();
768    }
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use gpui::TestAppContext;
775
776    #[ctor::ctor]
777    fn init_logger() {
778        if std::env::var("RUST_LOG").is_ok() {
779            env_logger::init();
780        }
781    }
782
783    #[gpui::test]
784    async fn test_fake(cx: &mut TestAppContext) {
785        let (server, mut fake) =
786            LanguageServer::fake("the-lsp".to_string(), Default::default(), cx.to_async());
787
788        let (message_tx, message_rx) = channel::unbounded();
789        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
790        server
791            .on_notification::<notification::ShowMessage, _>(move |params, _| {
792                message_tx.try_send(params).unwrap()
793            })
794            .detach();
795        server
796            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
797                diagnostics_tx.try_send(params).unwrap()
798            })
799            .detach();
800
801        let server = server.initialize(None).await.unwrap();
802        server
803            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
804                text_document: TextDocumentItem::new(
805                    Url::from_str("file://a/b").unwrap(),
806                    "rust".to_string(),
807                    0,
808                    "".to_string(),
809                ),
810            })
811            .unwrap();
812        assert_eq!(
813            fake.receive_notification::<notification::DidOpenTextDocument>()
814                .await
815                .text_document
816                .uri
817                .as_str(),
818            "file://a/b"
819        );
820
821        fake.notify::<notification::ShowMessage>(ShowMessageParams {
822            typ: MessageType::ERROR,
823            message: "ok".to_string(),
824        });
825        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
826            uri: Url::from_str("file://b/c").unwrap(),
827            version: Some(5),
828            diagnostics: vec![],
829        });
830        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
831        assert_eq!(
832            diagnostics_rx.recv().await.unwrap().uri.as_str(),
833            "file://b/c"
834        );
835
836        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
837
838        drop(server);
839        fake.receive_notification::<notification::Exit>().await;
840    }
841}