lsp.rs

  1pub use lsp_types::*;
  2
  3use anyhow::{anyhow, Context, Result};
  4use collections::HashMap;
  5use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  6use gpui::{executor, AsyncAppContext, Task};
  7use parking_lot::Mutex;
  8use postage::{barrier, prelude::Stream};
  9use serde::{de::DeserializeOwned, Deserialize, Serialize};
 10use serde_json::{json, value::RawValue, Value};
 11use smol::{
 12    channel,
 13    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 14    process::{self, Child},
 15};
 16use std::{
 17    future::Future,
 18    io::Write,
 19    path::PathBuf,
 20    str::FromStr,
 21    sync::{
 22        atomic::{AtomicUsize, Ordering::SeqCst},
 23        Arc,
 24    },
 25};
 26use std::{path::Path, process::Stdio};
 27use util::{ResultExt, TryFutureExt};
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 33type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 34
 35pub struct LanguageServer {
 36    server_id: usize,
 37    next_id: AtomicUsize,
 38    outbound_tx: channel::Sender<Vec<u8>>,
 39    name: String,
 40    capabilities: ServerCapabilities,
 41    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 42    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 43    executor: Arc<executor::Background>,
 44    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 45    output_done_rx: Mutex<Option<barrier::Receiver>>,
 46    root_path: PathBuf,
 47    _server: Option<Child>,
 48}
 49
 50pub struct Subscription {
 51    method: &'static str,
 52    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 53}
 54
 55#[derive(Serialize, Deserialize)]
 56struct Request<'a, T> {
 57    jsonrpc: &'a str,
 58    id: usize,
 59    method: &'a str,
 60    params: T,
 61}
 62
 63#[derive(Serialize, Deserialize)]
 64struct AnyResponse<'a> {
 65    id: usize,
 66    #[serde(default)]
 67    error: Option<Error>,
 68    #[serde(borrow)]
 69    result: Option<&'a RawValue>,
 70}
 71
 72#[derive(Serialize)]
 73struct Response<T> {
 74    id: usize,
 75    result: Option<T>,
 76    error: Option<Error>,
 77}
 78
 79#[derive(Serialize, Deserialize)]
 80struct Notification<'a, T> {
 81    #[serde(borrow)]
 82    jsonrpc: &'a str,
 83    #[serde(borrow)]
 84    method: &'a str,
 85    params: T,
 86}
 87
 88#[derive(Deserialize)]
 89struct AnyNotification<'a> {
 90    #[serde(default)]
 91    id: Option<usize>,
 92    #[serde(borrow)]
 93    method: &'a str,
 94    #[serde(borrow)]
 95    params: &'a RawValue,
 96}
 97
 98#[derive(Debug, Serialize, Deserialize)]
 99struct Error {
100    message: String,
101}
102
103impl LanguageServer {
104    pub fn new<T: AsRef<std::ffi::OsStr>>(
105        server_id: usize,
106        binary_path: &Path,
107        args: &[T],
108        root_path: &Path,
109        cx: AsyncAppContext,
110    ) -> Result<Self> {
111        let working_dir = if root_path.is_dir() {
112            root_path
113        } else {
114            root_path.parent().unwrap_or(Path::new("/"))
115        };
116        let mut server = process::Command::new(binary_path)
117            .current_dir(working_dir)
118            .args(args)
119            .stdin(Stdio::piped())
120            .stdout(Stdio::piped())
121            .stderr(Stdio::inherit())
122            .kill_on_drop(true)
123            .spawn()?;
124
125        let stdin = server.stdin.take().unwrap();
126        let stout = server.stdout.take().unwrap();
127
128        let mut server = Self::new_internal(
129            server_id,
130            stdin,
131            stout,
132            Some(server),
133            root_path,
134            cx,
135            |notification| {
136                log::info!(
137                    "unhandled notification {}:\n{}",
138                    notification.method,
139                    serde_json::to_string_pretty(
140                        &Value::from_str(notification.params.get()).unwrap()
141                    )
142                    .unwrap()
143                );
144            },
145        );
146        if let Some(name) = binary_path.file_name() {
147            server.name = name.to_string_lossy().to_string();
148        }
149        Ok(server)
150    }
151
152    fn new_internal<Stdin, Stdout, F>(
153        server_id: usize,
154        stdin: Stdin,
155        stdout: Stdout,
156        server: Option<Child>,
157        root_path: &Path,
158        cx: AsyncAppContext,
159        mut on_unhandled_notification: F,
160    ) -> Self
161    where
162        Stdin: AsyncWrite + Unpin + Send + 'static,
163        Stdout: AsyncRead + Unpin + Send + 'static,
164        F: FnMut(AnyNotification) + 'static + Send,
165    {
166        let mut stdin = BufWriter::new(stdin);
167        let mut stdout = BufReader::new(stdout);
168        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
169        let notification_handlers =
170            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
171        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
172        let input_task = cx.spawn(|cx| {
173            let notification_handlers = notification_handlers.clone();
174            let response_handlers = response_handlers.clone();
175            async move {
176                let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
177                let mut buffer = Vec::new();
178                loop {
179                    buffer.clear();
180                    stdout.read_until(b'\n', &mut buffer).await?;
181                    stdout.read_until(b'\n', &mut buffer).await?;
182                    let message_len: usize = std::str::from_utf8(&buffer)?
183                        .strip_prefix(CONTENT_LEN_HEADER)
184                        .ok_or_else(|| anyhow!("invalid header"))?
185                        .trim_end()
186                        .parse()?;
187
188                    buffer.resize(message_len, 0);
189                    stdout.read_exact(&mut buffer).await?;
190                    log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
191
192                    if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
193                        if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
194                            handler(msg.id, msg.params.get(), cx.clone());
195                        } else {
196                            on_unhandled_notification(msg);
197                        }
198                    } else if let Ok(AnyResponse { id, error, result }) =
199                        serde_json::from_slice(&buffer)
200                    {
201                        if let Some(handler) = response_handlers.lock().remove(&id) {
202                            if let Some(error) = error {
203                                handler(Err(error));
204                            } else if let Some(result) = result {
205                                handler(Ok(result.get()));
206                            } else {
207                                handler(Ok("null"));
208                            }
209                        }
210                    } else {
211                        return Err(anyhow!(
212                            "failed to deserialize message:\n{}",
213                            std::str::from_utf8(&buffer)?
214                        ));
215                    }
216
217                    // Don't starve the main thread when receiving lots of messages at once.
218                    smol::future::yield_now().await;
219                }
220            }
221            .log_err()
222        });
223        let (output_done_tx, output_done_rx) = barrier::channel();
224        let output_task = cx.background().spawn({
225            let response_handlers = response_handlers.clone();
226            async move {
227                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
228                let mut content_len_buffer = Vec::new();
229                while let Ok(message) = outbound_rx.recv().await {
230                    log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
231                    content_len_buffer.clear();
232                    write!(content_len_buffer, "{}", message.len()).unwrap();
233                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
234                    stdin.write_all(&content_len_buffer).await?;
235                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
236                    stdin.write_all(&message).await?;
237                    stdin.flush().await?;
238                }
239                drop(output_done_tx);
240                Ok(())
241            }
242            .log_err()
243        });
244
245        Self {
246            server_id,
247            notification_handlers,
248            response_handlers,
249            name: Default::default(),
250            capabilities: Default::default(),
251            next_id: Default::default(),
252            outbound_tx,
253            executor: cx.background().clone(),
254            io_tasks: Mutex::new(Some((input_task, output_task))),
255            output_done_rx: Mutex::new(Some(output_done_rx)),
256            root_path: root_path.to_path_buf(),
257            _server: server,
258        }
259    }
260
261    /// Initializes a language server.
262    /// Note that `options` is used directly to construct [`InitializeParams`],
263    /// which is why it is owned.
264    pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
265        let root_uri = Url::from_file_path(&self.root_path).unwrap();
266        #[allow(deprecated)]
267        let params = InitializeParams {
268            process_id: Default::default(),
269            root_path: Default::default(),
270            root_uri: Some(root_uri.clone()),
271            initialization_options: options,
272            capabilities: ClientCapabilities {
273                workspace: Some(WorkspaceClientCapabilities {
274                    configuration: Some(true),
275                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
276                        dynamic_registration: Some(true),
277                    }),
278                    ..Default::default()
279                }),
280                text_document: Some(TextDocumentClientCapabilities {
281                    definition: Some(GotoCapability {
282                        link_support: Some(true),
283                        ..Default::default()
284                    }),
285                    code_action: Some(CodeActionClientCapabilities {
286                        code_action_literal_support: Some(CodeActionLiteralSupport {
287                            code_action_kind: CodeActionKindLiteralSupport {
288                                value_set: vec![
289                                    CodeActionKind::REFACTOR.as_str().into(),
290                                    CodeActionKind::QUICKFIX.as_str().into(),
291                                    CodeActionKind::SOURCE.as_str().into(),
292                                ],
293                            },
294                        }),
295                        data_support: Some(true),
296                        resolve_support: Some(CodeActionCapabilityResolveSupport {
297                            properties: vec!["edit".to_string(), "command".to_string()],
298                        }),
299                        ..Default::default()
300                    }),
301                    completion: Some(CompletionClientCapabilities {
302                        completion_item: Some(CompletionItemCapability {
303                            snippet_support: Some(true),
304                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
305                                properties: vec!["additionalTextEdits".to_string()],
306                            }),
307                            ..Default::default()
308                        }),
309                        ..Default::default()
310                    }),
311                    rename: Some(RenameClientCapabilities {
312                        prepare_support: Some(true),
313                        ..Default::default()
314                    }),
315                    hover: Some(HoverClientCapabilities {
316                        content_format: Some(vec![MarkupKind::Markdown]),
317                        ..Default::default()
318                    }),
319                    ..Default::default()
320                }),
321                experimental: Some(json!({
322                    "serverStatusNotification": true,
323                })),
324                window: Some(WindowClientCapabilities {
325                    work_done_progress: Some(true),
326                    ..Default::default()
327                }),
328                ..Default::default()
329            },
330            trace: Default::default(),
331            workspace_folders: Some(vec![WorkspaceFolder {
332                uri: root_uri,
333                name: Default::default(),
334            }]),
335            client_info: Default::default(),
336            locale: Default::default(),
337        };
338
339        let response = self.request::<request::Initialize>(params).await?;
340        if let Some(info) = response.server_info {
341            self.name = info.name;
342        }
343        self.capabilities = response.capabilities;
344
345        self.notify::<notification::Initialized>(InitializedParams {})?;
346        Ok(Arc::new(self))
347    }
348
349    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
350        if let Some(tasks) = self.io_tasks.lock().take() {
351            let response_handlers = self.response_handlers.clone();
352            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
353            let outbound_tx = self.outbound_tx.clone();
354            let mut output_done = self.output_done_rx.lock().take().unwrap();
355            let shutdown_request = Self::request_internal::<request::Shutdown>(
356                &next_id,
357                &response_handlers,
358                &outbound_tx,
359                (),
360            );
361            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
362            outbound_tx.close();
363            Some(
364                async move {
365                    log::debug!("language server shutdown started");
366                    shutdown_request.await?;
367                    response_handlers.lock().clear();
368                    exit?;
369                    output_done.recv().await;
370                    log::debug!("language server shutdown finished");
371                    drop(tasks);
372                    Ok(())
373                }
374                .log_err(),
375            )
376        } else {
377            None
378        }
379    }
380
381    #[must_use]
382    pub fn on_notification<T, F>(&self, f: F) -> Subscription
383    where
384        T: notification::Notification,
385        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
386    {
387        self.on_custom_notification(T::METHOD, f)
388    }
389
390    #[must_use]
391    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
392    where
393        T: request::Request,
394        T::Params: 'static + Send,
395        F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
396        Fut: 'static + Future<Output = Result<T::Result>>,
397    {
398        self.on_custom_request(T::METHOD, f)
399    }
400
401    pub fn remove_request_handler<T: request::Request>(&self) {
402        self.notification_handlers.lock().remove(T::METHOD);
403    }
404
405    #[must_use]
406    pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
407    where
408        F: 'static + Send + FnMut(Params, AsyncAppContext),
409        Params: DeserializeOwned,
410    {
411        let prev_handler = self.notification_handlers.lock().insert(
412            method,
413            Box::new(move |_, params, cx| {
414                if let Some(params) = serde_json::from_str(params).log_err() {
415                    f(params, cx);
416                }
417            }),
418        );
419        assert!(
420            prev_handler.is_none(),
421            "registered multiple handlers for the same LSP method"
422        );
423        Subscription {
424            method,
425            notification_handlers: self.notification_handlers.clone(),
426        }
427    }
428
429    #[must_use]
430    pub fn on_custom_request<Params, Res, Fut, F>(
431        &self,
432        method: &'static str,
433        mut f: F,
434    ) -> Subscription
435    where
436        F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
437        Fut: 'static + Future<Output = Result<Res>>,
438        Params: DeserializeOwned + Send + 'static,
439        Res: Serialize,
440    {
441        let outbound_tx = self.outbound_tx.clone();
442        let prev_handler = self.notification_handlers.lock().insert(
443            method,
444            Box::new(move |id, params, cx| {
445                if let Some(id) = id {
446                    if let Some(params) = serde_json::from_str(params).log_err() {
447                        let response = f(params, cx.clone());
448                        cx.foreground()
449                            .spawn({
450                                let outbound_tx = outbound_tx.clone();
451                                async move {
452                                    let response = match response.await {
453                                        Ok(result) => Response {
454                                            id,
455                                            result: Some(result),
456                                            error: None,
457                                        },
458                                        Err(error) => Response {
459                                            id,
460                                            result: None,
461                                            error: Some(Error {
462                                                message: error.to_string(),
463                                            }),
464                                        },
465                                    };
466                                    if let Some(response) = serde_json::to_vec(&response).log_err()
467                                    {
468                                        outbound_tx.try_send(response).ok();
469                                    }
470                                }
471                            })
472                            .detach();
473                    }
474                }
475            }),
476        );
477        assert!(
478            prev_handler.is_none(),
479            "registered multiple handlers for the same LSP method"
480        );
481        Subscription {
482            method,
483            notification_handlers: self.notification_handlers.clone(),
484        }
485    }
486
487    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
488        &self.name
489    }
490
491    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
492        &self.capabilities
493    }
494
495    pub fn server_id(&self) -> usize {
496        self.server_id
497    }
498
499    pub fn root_path(&self) -> &PathBuf {
500        &self.root_path
501    }
502
503    pub fn request<T: request::Request>(
504        &self,
505        params: T::Params,
506    ) -> impl Future<Output = Result<T::Result>>
507    where
508        T::Result: 'static + Send,
509    {
510        Self::request_internal::<T>(
511            &self.next_id,
512            &self.response_handlers,
513            &self.outbound_tx,
514            params,
515        )
516    }
517
518    fn request_internal<T: request::Request>(
519        next_id: &AtomicUsize,
520        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
521        outbound_tx: &channel::Sender<Vec<u8>>,
522        params: T::Params,
523    ) -> impl 'static + Future<Output = Result<T::Result>>
524    where
525        T::Result: 'static + Send,
526    {
527        let id = next_id.fetch_add(1, SeqCst);
528        let message = serde_json::to_vec(&Request {
529            jsonrpc: JSON_RPC_VERSION,
530            id,
531            method: T::METHOD,
532            params,
533        })
534        .unwrap();
535
536        let send = outbound_tx
537            .try_send(message)
538            .context("failed to write to language server's stdin");
539
540        let (tx, rx) = oneshot::channel();
541        response_handlers.lock().insert(
542            id,
543            Box::new(move |result| {
544                let response = match result {
545                    Ok(response) => {
546                        serde_json::from_str(response).context("failed to deserialize response")
547                    }
548                    Err(error) => Err(anyhow!("{}", error.message)),
549                };
550                let _ = tx.send(response);
551            }),
552        );
553
554        async move {
555            send?;
556            rx.await?
557        }
558    }
559
560    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
561        Self::notify_internal::<T>(&self.outbound_tx, params)
562    }
563
564    fn notify_internal<T: notification::Notification>(
565        outbound_tx: &channel::Sender<Vec<u8>>,
566        params: T::Params,
567    ) -> Result<()> {
568        let message = serde_json::to_vec(&Notification {
569            jsonrpc: JSON_RPC_VERSION,
570            method: T::METHOD,
571            params,
572        })
573        .unwrap();
574        outbound_tx.try_send(message)?;
575        Ok(())
576    }
577}
578
579impl Drop for LanguageServer {
580    fn drop(&mut self) {
581        if let Some(shutdown) = self.shutdown() {
582            self.executor.spawn(shutdown).detach();
583        }
584    }
585}
586
587impl Subscription {
588    pub fn detach(mut self) {
589        self.method = "";
590    }
591}
592
593impl Drop for Subscription {
594    fn drop(&mut self) {
595        self.notification_handlers.lock().remove(self.method);
596    }
597}
598
599#[cfg(any(test, feature = "test-support"))]
600#[derive(Clone)]
601pub struct FakeLanguageServer {
602    pub server: Arc<LanguageServer>,
603    notifications_rx: channel::Receiver<(String, String)>,
604}
605
606#[cfg(any(test, feature = "test-support"))]
607impl LanguageServer {
608    pub fn full_capabilities() -> ServerCapabilities {
609        ServerCapabilities {
610            document_highlight_provider: Some(OneOf::Left(true)),
611            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
612            document_formatting_provider: Some(OneOf::Left(true)),
613            document_range_formatting_provider: Some(OneOf::Left(true)),
614            ..Default::default()
615        }
616    }
617
618    pub fn fake(
619        name: String,
620        capabilities: ServerCapabilities,
621        cx: AsyncAppContext,
622    ) -> (Self, FakeLanguageServer) {
623        let (stdin_writer, stdin_reader) = async_pipe::pipe();
624        let (stdout_writer, stdout_reader) = async_pipe::pipe();
625        let (notifications_tx, notifications_rx) = channel::unbounded();
626
627        let server = Self::new_internal(
628            0,
629            stdin_writer,
630            stdout_reader,
631            None,
632            Path::new("/"),
633            cx.clone(),
634            |_| {},
635        );
636        let fake = FakeLanguageServer {
637            server: Arc::new(Self::new_internal(
638                0,
639                stdout_writer,
640                stdin_reader,
641                None,
642                Path::new("/"),
643                cx.clone(),
644                move |msg| {
645                    notifications_tx
646                        .try_send((msg.method.to_string(), msg.params.get().to_string()))
647                        .ok();
648                },
649            )),
650            notifications_rx,
651        };
652        fake.handle_request::<request::Initialize, _, _>({
653            let capabilities = capabilities.clone();
654            move |_, _| {
655                let capabilities = capabilities.clone();
656                let name = name.clone();
657                async move {
658                    Ok(InitializeResult {
659                        capabilities,
660                        server_info: Some(ServerInfo {
661                            name,
662                            ..Default::default()
663                        }),
664                        ..Default::default()
665                    })
666                }
667            }
668        });
669
670        (server, fake)
671    }
672}
673
674#[cfg(any(test, feature = "test-support"))]
675impl FakeLanguageServer {
676    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
677        self.server.notify::<T>(params).ok();
678    }
679
680    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
681    where
682        T: request::Request,
683        T::Result: 'static + Send,
684    {
685        self.server.request::<T>(params).await
686    }
687
688    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
689        self.try_receive_notification::<T>().await.unwrap()
690    }
691
692    pub async fn try_receive_notification<T: notification::Notification>(
693        &mut self,
694    ) -> Option<T::Params> {
695        use futures::StreamExt as _;
696
697        loop {
698            let (method, params) = self.notifications_rx.next().await?;
699            if &method == T::METHOD {
700                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
701            } else {
702                log::info!("skipping message in fake language server {:?}", params);
703            }
704        }
705    }
706
707    pub fn handle_request<T, F, Fut>(
708        &self,
709        mut handler: F,
710    ) -> futures::channel::mpsc::UnboundedReceiver<()>
711    where
712        T: 'static + request::Request,
713        T::Params: 'static + Send,
714        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
715        Fut: 'static + Send + Future<Output = Result<T::Result>>,
716    {
717        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
718        self.server.remove_request_handler::<T>();
719        self.server
720            .on_request::<T, _, _>(move |params, cx| {
721                let result = handler(params, cx.clone());
722                let responded_tx = responded_tx.clone();
723                async move {
724                    cx.background().simulate_random_delay().await;
725                    let result = result.await;
726                    responded_tx.unbounded_send(()).ok();
727                    result
728                }
729            })
730            .detach();
731        responded_rx
732    }
733
734    pub fn remove_request_handler<T>(&mut self)
735    where
736        T: 'static + request::Request,
737    {
738        self.server.remove_request_handler::<T>();
739    }
740
741    pub async fn start_progress(&self, token: impl Into<String>) {
742        let token = token.into();
743        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
744            token: NumberOrString::String(token.clone()),
745        })
746        .await
747        .unwrap();
748        self.notify::<notification::Progress>(ProgressParams {
749            token: NumberOrString::String(token),
750            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
751        });
752    }
753
754    pub fn end_progress(&self, token: impl Into<String>) {
755        self.notify::<notification::Progress>(ProgressParams {
756            token: NumberOrString::String(token.into()),
757            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
758        });
759    }
760}
761
762struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
763
764impl Drop for ClearResponseHandlers {
765    fn drop(&mut self) {
766        self.0.lock().clear();
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use gpui::TestAppContext;
774
775    #[ctor::ctor]
776    fn init_logger() {
777        if std::env::var("RUST_LOG").is_ok() {
778            env_logger::init();
779        }
780    }
781
782    #[gpui::test]
783    async fn test_fake(cx: &mut TestAppContext) {
784        let (server, mut fake) =
785            LanguageServer::fake("the-lsp".to_string(), Default::default(), cx.to_async());
786
787        let (message_tx, message_rx) = channel::unbounded();
788        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
789        server
790            .on_notification::<notification::ShowMessage, _>(move |params, _| {
791                message_tx.try_send(params).unwrap()
792            })
793            .detach();
794        server
795            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
796                diagnostics_tx.try_send(params).unwrap()
797            })
798            .detach();
799
800        let server = server.initialize(None).await.unwrap();
801        server
802            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
803                text_document: TextDocumentItem::new(
804                    Url::from_str("file://a/b").unwrap(),
805                    "rust".to_string(),
806                    0,
807                    "".to_string(),
808                ),
809            })
810            .unwrap();
811        assert_eq!(
812            fake.receive_notification::<notification::DidOpenTextDocument>()
813                .await
814                .text_document
815                .uri
816                .as_str(),
817            "file://a/b"
818        );
819
820        fake.notify::<notification::ShowMessage>(ShowMessageParams {
821            typ: MessageType::ERROR,
822            message: "ok".to_string(),
823        });
824        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
825            uri: Url::from_str("file://b/c").unwrap(),
826            version: Some(5),
827            diagnostics: vec![],
828        });
829        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
830        assert_eq!(
831            diagnostics_rx.recv().await.unwrap().uri.as_str(),
832            "file://b/c"
833        );
834
835        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
836
837        drop(server);
838        fake.receive_notification::<notification::Exit>().await;
839    }
840}