lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::channel::oneshot;
  3use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream, watch};
  7use serde::{Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    collections::HashMap,
 16    future::Future,
 17    io::Write,
 18    str::FromStr,
 19    sync::{
 20        atomic::{AtomicUsize, Ordering::SeqCst},
 21        Arc,
 22    },
 23};
 24use std::{path::Path, process::Stdio};
 25use util::TryFutureExt;
 26
 27pub use lsp_types::*;
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 33type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 34
 35pub struct LanguageServer {
 36    next_id: AtomicUsize,
 37    outbound_tx: channel::Sender<Vec<u8>>,
 38    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 39    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 40    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 41    executor: Arc<executor::Background>,
 42    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 43    initialized: barrier::Receiver,
 44    output_done_rx: Mutex<Option<barrier::Receiver>>,
 45}
 46
 47pub struct Subscription {
 48    method: &'static str,
 49    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 50}
 51
 52#[derive(Serialize, Deserialize)]
 53struct Request<'a, T> {
 54    jsonrpc: &'a str,
 55    id: usize,
 56    method: &'a str,
 57    params: T,
 58}
 59
 60#[cfg(any(test, feature = "test-support"))]
 61#[derive(Deserialize)]
 62struct AnyRequest<'a> {
 63    id: usize,
 64    #[serde(borrow)]
 65    jsonrpc: &'a str,
 66    #[serde(borrow)]
 67    method: &'a str,
 68    #[serde(borrow)]
 69    params: &'a RawValue,
 70}
 71
 72#[derive(Serialize, Deserialize)]
 73struct AnyResponse<'a> {
 74    id: usize,
 75    #[serde(default)]
 76    error: Option<Error>,
 77    #[serde(borrow)]
 78    result: Option<&'a RawValue>,
 79}
 80
 81#[derive(Serialize, Deserialize)]
 82struct Notification<'a, T> {
 83    #[serde(borrow)]
 84    jsonrpc: &'a str,
 85    #[serde(borrow)]
 86    method: &'a str,
 87    params: T,
 88}
 89
 90#[derive(Deserialize)]
 91struct AnyNotification<'a> {
 92    #[serde(borrow)]
 93    method: &'a str,
 94    #[serde(borrow)]
 95    params: &'a RawValue,
 96}
 97
 98#[derive(Debug, Serialize, Deserialize)]
 99struct Error {
100    message: String,
101}
102
103impl LanguageServer {
104    pub fn new(
105        binary_path: &Path,
106        root_path: &Path,
107        background: Arc<executor::Background>,
108    ) -> Result<Arc<Self>> {
109        let mut server = Command::new(binary_path)
110            .stdin(Stdio::piped())
111            .stdout(Stdio::piped())
112            .stderr(Stdio::inherit())
113            .spawn()?;
114        let stdin = server.stdin.take().unwrap();
115        let stdout = server.stdout.take().unwrap();
116        Self::new_internal(stdin, stdout, root_path, background)
117    }
118
119    fn new_internal<Stdin, Stdout>(
120        stdin: Stdin,
121        stdout: Stdout,
122        root_path: &Path,
123        executor: Arc<executor::Background>,
124    ) -> Result<Arc<Self>>
125    where
126        Stdin: AsyncWrite + Unpin + Send + 'static,
127        Stdout: AsyncRead + Unpin + Send + 'static,
128    {
129        let mut stdin = BufWriter::new(stdin);
130        let mut stdout = BufReader::new(stdout);
131        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
132        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
133        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
134        let input_task = executor.spawn(
135            {
136                let notification_handlers = notification_handlers.clone();
137                let response_handlers = response_handlers.clone();
138                async move {
139                    let _clear_response_channels = ClearResponseChannels(response_handlers.clone());
140                    let mut buffer = Vec::new();
141                    loop {
142                        buffer.clear();
143                        stdout.read_until(b'\n', &mut buffer).await?;
144                        stdout.read_until(b'\n', &mut buffer).await?;
145                        let message_len: usize = std::str::from_utf8(&buffer)?
146                            .strip_prefix(CONTENT_LEN_HEADER)
147                            .ok_or_else(|| anyhow!("invalid header"))?
148                            .trim_end()
149                            .parse()?;
150
151                        buffer.resize(message_len, 0);
152                        stdout.read_exact(&mut buffer).await?;
153
154                        if let Ok(AnyNotification { method, params }) =
155                            serde_json::from_slice(&buffer)
156                        {
157                            if let Some(handler) = notification_handlers.write().get_mut(method) {
158                                handler(params.get());
159                            } else {
160                                log::info!(
161                                    "unhandled notification {}:\n{}",
162                                    method,
163                                    serde_json::to_string_pretty(
164                                        &Value::from_str(params.get()).unwrap()
165                                    )
166                                    .unwrap()
167                                );
168                            }
169                        } else if let Ok(AnyResponse { id, error, result }) =
170                            serde_json::from_slice(&buffer)
171                        {
172                            if let Some(handler) = response_handlers.lock().remove(&id) {
173                                if let Some(error) = error {
174                                    handler(Err(error));
175                                } else if let Some(result) = result {
176                                    handler(Ok(result.get()));
177                                } else {
178                                    handler(Ok("null"));
179                                }
180                            }
181                        } else {
182                            return Err(anyhow!(
183                                "failed to deserialize message:\n{}",
184                                std::str::from_utf8(&buffer)?
185                            ));
186                        }
187                    }
188                }
189            }
190            .log_err(),
191        );
192        let (output_done_tx, output_done_rx) = barrier::channel();
193        let output_task = executor.spawn(
194            async move {
195                let mut content_len_buffer = Vec::new();
196                while let Ok(message) = outbound_rx.recv().await {
197                    content_len_buffer.clear();
198                    write!(content_len_buffer, "{}", message.len()).unwrap();
199                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
200                    stdin.write_all(&content_len_buffer).await?;
201                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
202                    stdin.write_all(&message).await?;
203                    stdin.flush().await?;
204                }
205                drop(output_done_tx);
206                Ok(())
207            }
208            .log_err(),
209        );
210
211        let (initialized_tx, initialized_rx) = barrier::channel();
212        let (mut capabilities_tx, capabilities_rx) = watch::channel();
213        let this = Arc::new(Self {
214            notification_handlers,
215            response_handlers,
216            capabilities: capabilities_rx,
217            next_id: Default::default(),
218            outbound_tx,
219            executor: executor.clone(),
220            io_tasks: Mutex::new(Some((input_task, output_task))),
221            initialized: initialized_rx,
222            output_done_rx: Mutex::new(Some(output_done_rx)),
223        });
224
225        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
226        executor
227            .spawn({
228                let this = this.clone();
229                async move {
230                    if let Some(capabilities) = this.init(root_uri).log_err().await {
231                        *capabilities_tx.borrow_mut() = Some(capabilities);
232                    }
233
234                    drop(initialized_tx);
235                }
236            })
237            .detach();
238
239        Ok(this)
240    }
241
242    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
243        #[allow(deprecated)]
244        let params = InitializeParams {
245            process_id: Default::default(),
246            root_path: Default::default(),
247            root_uri: Some(root_uri),
248            initialization_options: Default::default(),
249            capabilities: ClientCapabilities {
250                text_document: Some(TextDocumentClientCapabilities {
251                    definition: Some(GotoCapability {
252                        link_support: Some(true),
253                        ..Default::default()
254                    }),
255                    code_action: Some(CodeActionClientCapabilities {
256                        code_action_literal_support: Some(CodeActionLiteralSupport {
257                            code_action_kind: CodeActionKindLiteralSupport {
258                                value_set: vec![
259                                    CodeActionKind::REFACTOR.as_str().into(),
260                                    CodeActionKind::QUICKFIX.as_str().into(),
261                                ],
262                            },
263                        }),
264                        data_support: Some(true),
265                        resolve_support: Some(CodeActionCapabilityResolveSupport {
266                            properties: vec!["edit".to_string()],
267                        }),
268                        ..Default::default()
269                    }),
270                    completion: Some(CompletionClientCapabilities {
271                        completion_item: Some(CompletionItemCapability {
272                            snippet_support: Some(true),
273                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
274                                properties: vec!["additionalTextEdits".to_string()],
275                            }),
276                            ..Default::default()
277                        }),
278                        ..Default::default()
279                    }),
280                    ..Default::default()
281                }),
282                experimental: Some(json!({
283                    "serverStatusNotification": true,
284                })),
285                window: Some(WindowClientCapabilities {
286                    work_done_progress: Some(true),
287                    ..Default::default()
288                }),
289                ..Default::default()
290            },
291            trace: Default::default(),
292            workspace_folders: Default::default(),
293            client_info: Default::default(),
294            locale: Default::default(),
295        };
296
297        let this = self.clone();
298        let request = Self::request_internal::<request::Initialize>(
299            &this.next_id,
300            &this.response_handlers,
301            &this.outbound_tx,
302            params,
303        );
304        let response = request.await?;
305        Self::notify_internal::<notification::Initialized>(
306            &this.outbound_tx,
307            InitializedParams {},
308        )?;
309        Ok(response.capabilities)
310    }
311
312    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
313        if let Some(tasks) = self.io_tasks.lock().take() {
314            let response_handlers = self.response_handlers.clone();
315            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
316            let outbound_tx = self.outbound_tx.clone();
317            let mut output_done = self.output_done_rx.lock().take().unwrap();
318            let shutdown_request = Self::request_internal::<request::Shutdown>(
319                &next_id,
320                &response_handlers,
321                &outbound_tx,
322                (),
323            );
324            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
325            outbound_tx.close();
326            Some(
327                async move {
328                    log::debug!("language server shutdown started");
329                    shutdown_request.await?;
330                    response_handlers.lock().clear();
331                    exit?;
332                    output_done.recv().await;
333                    log::debug!("language server shutdown finished");
334                    drop(tasks);
335                    Ok(())
336                }
337                .log_err(),
338            )
339        } else {
340            None
341        }
342    }
343
344    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
345    where
346        T: notification::Notification,
347        F: 'static + Send + Sync + FnMut(T::Params),
348    {
349        let prev_handler = self.notification_handlers.write().insert(
350            T::METHOD,
351            Box::new(
352                move |notification| match serde_json::from_str(notification) {
353                    Ok(notification) => f(notification),
354                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
355                },
356            ),
357        );
358
359        assert!(
360            prev_handler.is_none(),
361            "registered multiple handlers for the same notification"
362        );
363
364        Subscription {
365            method: T::METHOD,
366            notification_handlers: self.notification_handlers.clone(),
367        }
368    }
369
370    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
371        self.capabilities.clone()
372    }
373
374    pub fn request<T: request::Request>(
375        self: &Arc<Self>,
376        params: T::Params,
377    ) -> impl Future<Output = Result<T::Result>>
378    where
379        T::Result: 'static + Send,
380    {
381        let this = self.clone();
382        async move {
383            this.initialized.clone().recv().await;
384            Self::request_internal::<T>(
385                &this.next_id,
386                &this.response_handlers,
387                &this.outbound_tx,
388                params,
389            )
390            .await
391        }
392    }
393
394    fn request_internal<T: request::Request>(
395        next_id: &AtomicUsize,
396        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
397        outbound_tx: &channel::Sender<Vec<u8>>,
398        params: T::Params,
399    ) -> impl 'static + Future<Output = Result<T::Result>>
400    where
401        T::Result: 'static + Send,
402    {
403        let id = next_id.fetch_add(1, SeqCst);
404        let message = serde_json::to_vec(&Request {
405            jsonrpc: JSON_RPC_VERSION,
406            id,
407            method: T::METHOD,
408            params,
409        })
410        .unwrap();
411        let mut response_handlers = response_handlers.lock();
412        let (tx, rx) = oneshot::channel();
413        response_handlers.insert(
414            id,
415            Box::new(move |result| {
416                let response = match result {
417                    Ok(response) => {
418                        serde_json::from_str(response).context("failed to deserialize response")
419                    }
420                    Err(error) => Err(anyhow!("{}", error.message)),
421                };
422                let _ = tx.send(response);
423            }),
424        );
425
426        let send = outbound_tx
427            .try_send(message)
428            .context("failed to write to language server's stdin");
429        async move {
430            send?;
431            rx.await?
432        }
433    }
434
435    pub fn notify<T: notification::Notification>(
436        self: &Arc<Self>,
437        params: T::Params,
438    ) -> impl Future<Output = Result<()>> {
439        let this = self.clone();
440        async move {
441            this.initialized.clone().recv().await;
442            Self::notify_internal::<T>(&this.outbound_tx, params)?;
443            Ok(())
444        }
445    }
446
447    fn notify_internal<T: notification::Notification>(
448        outbound_tx: &channel::Sender<Vec<u8>>,
449        params: T::Params,
450    ) -> Result<()> {
451        let message = serde_json::to_vec(&Notification {
452            jsonrpc: JSON_RPC_VERSION,
453            method: T::METHOD,
454            params,
455        })
456        .unwrap();
457        outbound_tx.try_send(message)?;
458        Ok(())
459    }
460}
461
462impl Drop for LanguageServer {
463    fn drop(&mut self) {
464        if let Some(shutdown) = self.shutdown() {
465            self.executor.spawn(shutdown).detach();
466        }
467    }
468}
469
470impl Subscription {
471    pub fn detach(mut self) {
472        self.method = "";
473    }
474}
475
476impl Drop for Subscription {
477    fn drop(&mut self) {
478        self.notification_handlers.write().remove(self.method);
479    }
480}
481
482#[cfg(any(test, feature = "test-support"))]
483pub struct FakeLanguageServer {
484    handlers: FakeLanguageServerHandlers,
485    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
486    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
487    _input_task: Task<Result<()>>,
488    _output_task: Task<Result<()>>,
489}
490
491type FakeLanguageServerHandlers = Arc<
492    Mutex<
493        HashMap<
494            &'static str,
495            Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
496        >,
497    >,
498>;
499
500#[cfg(any(test, feature = "test-support"))]
501impl LanguageServer {
502    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
503        Self::fake_with_capabilities(Default::default(), cx)
504    }
505
506    pub fn fake_with_capabilities(
507        capabilities: ServerCapabilities,
508        cx: &mut gpui::MutableAppContext,
509    ) -> (Arc<Self>, FakeLanguageServer) {
510        let (stdin_writer, stdin_reader) = async_pipe::pipe();
511        let (stdout_writer, stdout_reader) = async_pipe::pipe();
512
513        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
514        fake.handle_request::<request::Initialize, _>({
515            let capabilities = capabilities.clone();
516            move |_, _| InitializeResult {
517                capabilities: capabilities.clone(),
518                ..Default::default()
519            }
520        });
521
522        let server = Self::new_internal(
523            stdin_writer,
524            stdout_reader,
525            Path::new("/"),
526            cx.background().clone(),
527        )
528        .unwrap();
529
530        (server, fake)
531    }
532}
533
534#[cfg(any(test, feature = "test-support"))]
535impl FakeLanguageServer {
536    fn new(
537        stdin: async_pipe::PipeReader,
538        stdout: async_pipe::PipeWriter,
539        cx: &mut gpui::MutableAppContext,
540    ) -> Self {
541        use futures::StreamExt as _;
542
543        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
544        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
545        let handlers = FakeLanguageServerHandlers::default();
546
547        let input_task = cx.spawn(|cx| {
548            let handlers = handlers.clone();
549            let outgoing_tx = outgoing_tx.clone();
550            async move {
551                let mut buffer = Vec::new();
552                let mut stdin = smol::io::BufReader::new(stdin);
553                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
554                    cx.background().simulate_random_delay().await;
555                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
556                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
557
558                        let response;
559                        if let Some(handler) = handlers.lock().get_mut(request.method) {
560                            response =
561                                handler(request.id, request.params.get().as_bytes(), cx.clone());
562                            log::debug!("handled lsp request. method:{}", request.method);
563                        } else {
564                            response = serde_json::to_vec(&AnyResponse {
565                                id: request.id,
566                                error: Some(Error {
567                                    message: "no handler".to_string(),
568                                }),
569                                result: None,
570                            })
571                            .unwrap();
572                            log::debug!("unhandled lsp request. method:{}", request.method);
573                        }
574                        outgoing_tx.unbounded_send(response)?;
575                    } else {
576                        incoming_tx.unbounded_send(buffer.clone())?;
577                    }
578                }
579                Ok::<_, anyhow::Error>(())
580            }
581        });
582
583        let output_task = cx.background().spawn(async move {
584            let mut stdout = smol::io::BufWriter::new(PipeWriterCloseOnDrop(stdout));
585            while let Some(message) = outgoing_rx.next().await {
586                stdout
587                    .write_all(CONTENT_LEN_HEADER.as_bytes())
588                    .await
589                    .unwrap();
590                stdout
591                    .write_all((format!("{}", message.len())).as_bytes())
592                    .await
593                    .unwrap();
594                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
595                stdout.write_all(&message).await.unwrap();
596                stdout.flush().await.unwrap();
597            }
598            Ok(())
599        });
600
601        Self {
602            outgoing_tx,
603            incoming_rx,
604            handlers,
605            _input_task: input_task,
606            _output_task: output_task,
607        }
608    }
609
610    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
611        let message = serde_json::to_vec(&Notification {
612            jsonrpc: JSON_RPC_VERSION,
613            method: T::METHOD,
614            params,
615        })
616        .unwrap();
617        self.outgoing_tx.unbounded_send(message).unwrap();
618    }
619
620    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
621        use futures::StreamExt as _;
622
623        loop {
624            let bytes = self.incoming_rx.next().await.unwrap();
625            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
626                assert_eq!(notification.method, T::METHOD);
627                return notification.params;
628            } else {
629                log::info!(
630                    "skipping message in fake language server {:?}",
631                    std::str::from_utf8(&bytes)
632                );
633            }
634        }
635    }
636
637    pub fn handle_request<T, F>(
638        &mut self,
639        mut handler: F,
640    ) -> futures::channel::mpsc::UnboundedReceiver<()>
641    where
642        T: 'static + request::Request,
643        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
644    {
645        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
646        self.handlers.lock().insert(
647            T::METHOD,
648            Box::new(move |id, params, cx| {
649                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
650                let result = serde_json::to_string(&result).unwrap();
651                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
652                let response = AnyResponse {
653                    id,
654                    error: None,
655                    result: Some(result),
656                };
657                responded_tx.unbounded_send(()).ok();
658                serde_json::to_vec(&response).unwrap()
659            }),
660        );
661        responded_rx
662    }
663
664    pub fn remove_request_handler<T>(&mut self)
665    where
666        T: 'static + request::Request,
667    {
668        self.handlers.lock().remove(T::METHOD);
669    }
670
671    pub async fn start_progress(&mut self, token: impl Into<String>) {
672        self.notify::<notification::Progress>(ProgressParams {
673            token: NumberOrString::String(token.into()),
674            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
675        })
676        .await;
677    }
678
679    pub async fn end_progress(&mut self, token: impl Into<String>) {
680        self.notify::<notification::Progress>(ProgressParams {
681            token: NumberOrString::String(token.into()),
682            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
683        })
684        .await;
685    }
686
687    async fn receive(
688        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
689        buffer: &mut Vec<u8>,
690    ) -> Result<()> {
691        buffer.clear();
692        stdin.read_until(b'\n', buffer).await?;
693        stdin.read_until(b'\n', buffer).await?;
694        let message_len: usize = std::str::from_utf8(buffer)
695            .unwrap()
696            .strip_prefix(CONTENT_LEN_HEADER)
697            .unwrap()
698            .trim_end()
699            .parse()
700            .unwrap();
701        buffer.resize(message_len, 0);
702        stdin.read_exact(buffer).await?;
703        Ok(())
704    }
705}
706
707struct PipeWriterCloseOnDrop(async_pipe::PipeWriter);
708
709impl Drop for PipeWriterCloseOnDrop {
710    fn drop(&mut self) {
711        self.0.close().ok();
712    }
713}
714
715impl AsyncWrite for PipeWriterCloseOnDrop {
716    fn poll_write(
717        mut self: std::pin::Pin<&mut Self>,
718        cx: &mut std::task::Context<'_>,
719        buf: &[u8],
720    ) -> std::task::Poll<std::io::Result<usize>> {
721        let pipe = &mut self.0;
722        smol::pin!(pipe);
723        pipe.poll_write(cx, buf)
724    }
725
726    fn poll_flush(
727        mut self: std::pin::Pin<&mut Self>,
728        cx: &mut std::task::Context<'_>,
729    ) -> std::task::Poll<std::io::Result<()>> {
730        let pipe = &mut self.0;
731        smol::pin!(pipe);
732        pipe.poll_flush(cx)
733    }
734
735    fn poll_close(
736        mut self: std::pin::Pin<&mut Self>,
737        cx: &mut std::task::Context<'_>,
738    ) -> std::task::Poll<std::io::Result<()>> {
739        let pipe = &mut self.0;
740        smol::pin!(pipe);
741        pipe.poll_close(cx)
742    }
743}
744
745struct ClearResponseChannels(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
746
747impl Drop for ClearResponseChannels {
748    fn drop(&mut self) {
749        self.0.lock().clear();
750    }
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use gpui::TestAppContext;
757
758    #[ctor::ctor]
759    fn init_logger() {
760        if std::env::var("RUST_LOG").is_ok() {
761            env_logger::init();
762        }
763    }
764
765    #[gpui::test]
766    async fn test_fake(cx: &mut TestAppContext) {
767        let (server, mut fake) = cx.update(LanguageServer::fake);
768
769        let (message_tx, message_rx) = channel::unbounded();
770        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
771        server
772            .on_notification::<notification::ShowMessage, _>(move |params| {
773                message_tx.try_send(params).unwrap()
774            })
775            .detach();
776        server
777            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
778                diagnostics_tx.try_send(params).unwrap()
779            })
780            .detach();
781
782        server
783            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
784                text_document: TextDocumentItem::new(
785                    Url::from_str("file://a/b").unwrap(),
786                    "rust".to_string(),
787                    0,
788                    "".to_string(),
789                ),
790            })
791            .await
792            .unwrap();
793        assert_eq!(
794            fake.receive_notification::<notification::DidOpenTextDocument>()
795                .await
796                .text_document
797                .uri
798                .as_str(),
799            "file://a/b"
800        );
801
802        fake.notify::<notification::ShowMessage>(ShowMessageParams {
803            typ: MessageType::ERROR,
804            message: "ok".to_string(),
805        })
806        .await;
807        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
808            uri: Url::from_str("file://b/c").unwrap(),
809            version: Some(5),
810            diagnostics: vec![],
811        })
812        .await;
813        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
814        assert_eq!(
815            diagnostics_rx.recv().await.unwrap().uri.as_str(),
816            "file://b/c"
817        );
818
819        fake.handle_request::<request::Shutdown, _>(|_, _| ());
820
821        drop(server);
822        fake.receive_notification::<notification::Exit>().await;
823    }
824
825    pub enum ServerStatusNotification {}
826
827    impl notification::Notification for ServerStatusNotification {
828        type Params = ServerStatusParams;
829        const METHOD: &'static str = "experimental/serverStatus";
830    }
831
832    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
833    pub struct ServerStatusParams {
834        pub quiescent: bool,
835    }
836}