lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  3use gpui::{executor, Task};
  4use parking_lot::{Mutex, RwLock};
  5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
  6use serde::{Deserialize, Serialize};
  7use serde_json::{json, value::RawValue, Value};
  8use smol::{
  9    channel,
 10    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 11    process::Command,
 12};
 13use std::{
 14    collections::HashMap,
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[derive(Serialize, Deserialize)]
 60struct AnyResponse<'a> {
 61    id: usize,
 62    #[serde(default)]
 63    error: Option<Error>,
 64    #[serde(borrow)]
 65    result: Option<&'a RawValue>,
 66}
 67
 68#[derive(Serialize, Deserialize)]
 69struct Notification<'a, T> {
 70    #[serde(borrow)]
 71    jsonrpc: &'a str,
 72    #[serde(borrow)]
 73    method: &'a str,
 74    params: T,
 75}
 76
 77#[derive(Deserialize)]
 78struct AnyNotification<'a> {
 79    #[serde(borrow)]
 80    method: &'a str,
 81    #[serde(borrow)]
 82    params: &'a RawValue,
 83}
 84
 85#[derive(Debug, Serialize, Deserialize)]
 86struct Error {
 87    message: String,
 88}
 89
 90impl LanguageServer {
 91    pub fn new(
 92        binary_path: &Path,
 93        root_path: &Path,
 94        background: Arc<executor::Background>,
 95    ) -> Result<Arc<Self>> {
 96        let mut server = Command::new(binary_path)
 97            .stdin(Stdio::piped())
 98            .stdout(Stdio::piped())
 99            .stderr(Stdio::inherit())
100            .spawn()?;
101        let stdin = server.stdin.take().unwrap();
102        let stdout = server.stdout.take().unwrap();
103        Self::new_internal(stdin, stdout, root_path, background)
104    }
105
106    fn new_internal<Stdin, Stdout>(
107        stdin: Stdin,
108        stdout: Stdout,
109        root_path: &Path,
110        executor: Arc<executor::Background>,
111    ) -> Result<Arc<Self>>
112    where
113        Stdin: AsyncWrite + Unpin + Send + 'static,
114        Stdout: AsyncRead + Unpin + Send + 'static,
115    {
116        let mut stdin = BufWriter::new(stdin);
117        let mut stdout = BufReader::new(stdout);
118        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
119        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
120        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
121        let input_task = executor.spawn(
122            {
123                let notification_handlers = notification_handlers.clone();
124                let response_handlers = response_handlers.clone();
125                async move {
126                    let mut buffer = Vec::new();
127                    loop {
128                        buffer.clear();
129                        stdout.read_until(b'\n', &mut buffer).await?;
130                        stdout.read_until(b'\n', &mut buffer).await?;
131                        let message_len: usize = std::str::from_utf8(&buffer)?
132                            .strip_prefix(CONTENT_LEN_HEADER)
133                            .ok_or_else(|| anyhow!("invalid header"))?
134                            .trim_end()
135                            .parse()?;
136
137                        buffer.resize(message_len, 0);
138                        stdout.read_exact(&mut buffer).await?;
139
140                        if let Ok(AnyNotification { method, params }) =
141                            serde_json::from_slice(&buffer)
142                        {
143                            if let Some(handler) = notification_handlers.write().get_mut(method) {
144                                handler(params.get());
145                            } else {
146                                log::info!(
147                                    "unhandled notification {}:\n{}",
148                                    method,
149                                    serde_json::to_string_pretty(
150                                        &Value::from_str(params.get()).unwrap()
151                                    )
152                                    .unwrap()
153                                );
154                            }
155                        } else if let Ok(AnyResponse { id, error, result }) =
156                            serde_json::from_slice(&buffer)
157                        {
158                            if let Some(handler) = response_handlers.lock().remove(&id) {
159                                if let Some(error) = error {
160                                    handler(Err(error));
161                                } else if let Some(result) = result {
162                                    handler(Ok(result.get()));
163                                } else {
164                                    handler(Ok("null"));
165                                }
166                            }
167                        } else {
168                            return Err(anyhow!(
169                                "failed to deserialize message:\n{}",
170                                std::str::from_utf8(&buffer)?
171                            ));
172                        }
173                    }
174                }
175            }
176            .log_err(),
177        );
178        let (output_done_tx, output_done_rx) = barrier::channel();
179        let output_task = executor.spawn(
180            async move {
181                let mut content_len_buffer = Vec::new();
182                while let Ok(message) = outbound_rx.recv().await {
183                    content_len_buffer.clear();
184                    write!(content_len_buffer, "{}", message.len()).unwrap();
185                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
186                    stdin.write_all(&content_len_buffer).await?;
187                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
188                    stdin.write_all(&message).await?;
189                    stdin.flush().await?;
190                }
191                drop(output_done_tx);
192                Ok(())
193            }
194            .log_err(),
195        );
196
197        let (initialized_tx, initialized_rx) = barrier::channel();
198        let (mut capabilities_tx, capabilities_rx) = watch::channel();
199        let this = Arc::new(Self {
200            notification_handlers,
201            response_handlers,
202            capabilities: capabilities_rx,
203            next_id: Default::default(),
204            outbound_tx: RwLock::new(Some(outbound_tx)),
205            executor: executor.clone(),
206            io_tasks: Mutex::new(Some((input_task, output_task))),
207            initialized: initialized_rx,
208            output_done_rx: Mutex::new(Some(output_done_rx)),
209        });
210
211        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
212        executor
213            .spawn({
214                let this = this.clone();
215                async move {
216                    if let Some(capabilities) = this.init(root_uri).log_err().await {
217                        *capabilities_tx.borrow_mut() = Some(capabilities);
218                    }
219
220                    drop(initialized_tx);
221                }
222            })
223            .detach();
224
225        Ok(this)
226    }
227
228    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
229        #[allow(deprecated)]
230        let params = InitializeParams {
231            process_id: Default::default(),
232            root_path: Default::default(),
233            root_uri: Some(root_uri),
234            initialization_options: Default::default(),
235            capabilities: ClientCapabilities {
236                text_document: Some(TextDocumentClientCapabilities {
237                    definition: Some(GotoCapability {
238                        link_support: Some(true),
239                        ..Default::default()
240                    }),
241                    code_action: Some(CodeActionClientCapabilities {
242                        code_action_literal_support: Some(CodeActionLiteralSupport {
243                            code_action_kind: CodeActionKindLiteralSupport {
244                                value_set: vec![
245                                    CodeActionKind::REFACTOR.as_str().into(),
246                                    CodeActionKind::QUICKFIX.as_str().into(),
247                                ],
248                            },
249                        }),
250                        data_support: Some(true),
251                        resolve_support: Some(CodeActionCapabilityResolveSupport {
252                            properties: vec!["edit".to_string()],
253                        }),
254                        ..Default::default()
255                    }),
256                    completion: Some(CompletionClientCapabilities {
257                        completion_item: Some(CompletionItemCapability {
258                            snippet_support: Some(true),
259                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
260                                properties: vec!["additionalTextEdits".to_string()],
261                            }),
262                            ..Default::default()
263                        }),
264                        ..Default::default()
265                    }),
266                    ..Default::default()
267                }),
268                experimental: Some(json!({
269                    "serverStatusNotification": true,
270                })),
271                window: Some(WindowClientCapabilities {
272                    work_done_progress: Some(true),
273                    ..Default::default()
274                }),
275                ..Default::default()
276            },
277            trace: Default::default(),
278            workspace_folders: Default::default(),
279            client_info: Default::default(),
280            locale: Default::default(),
281        };
282
283        let this = self.clone();
284        let request = Self::request_internal::<request::Initialize>(
285            &this.next_id,
286            &this.response_handlers,
287            this.outbound_tx.read().as_ref(),
288            params,
289        );
290        let response = request.await?;
291        Self::notify_internal::<notification::Initialized>(
292            this.outbound_tx.read().as_ref(),
293            InitializedParams {},
294        )?;
295        Ok(response.capabilities)
296    }
297
298    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
299        if let Some(tasks) = self.io_tasks.lock().take() {
300            let response_handlers = self.response_handlers.clone();
301            let outbound_tx = self.outbound_tx.write().take();
302            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
303            let mut output_done = self.output_done_rx.lock().take().unwrap();
304            Some(async move {
305                Self::request_internal::<request::Shutdown>(
306                    &next_id,
307                    &response_handlers,
308                    outbound_tx.as_ref(),
309                    (),
310                )
311                .await?;
312                Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
313                drop(outbound_tx);
314                output_done.recv().await;
315                drop(tasks);
316                Ok(())
317            })
318        } else {
319            None
320        }
321    }
322
323    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
324    where
325        T: notification::Notification,
326        F: 'static + Send + Sync + FnMut(T::Params),
327    {
328        let prev_handler = self.notification_handlers.write().insert(
329            T::METHOD,
330            Box::new(
331                move |notification| match serde_json::from_str(notification) {
332                    Ok(notification) => f(notification),
333                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
334                },
335            ),
336        );
337
338        assert!(
339            prev_handler.is_none(),
340            "registered multiple handlers for the same notification"
341        );
342
343        Subscription {
344            method: T::METHOD,
345            notification_handlers: self.notification_handlers.clone(),
346        }
347    }
348
349    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
350        self.capabilities.clone()
351    }
352
353    pub fn request<T: request::Request>(
354        self: &Arc<Self>,
355        params: T::Params,
356    ) -> impl Future<Output = Result<T::Result>>
357    where
358        T::Result: 'static + Send,
359    {
360        let this = self.clone();
361        async move {
362            this.initialized.clone().recv().await;
363            Self::request_internal::<T>(
364                &this.next_id,
365                &this.response_handlers,
366                this.outbound_tx.read().as_ref(),
367                params,
368            )
369            .await
370        }
371    }
372
373    fn request_internal<T: request::Request>(
374        next_id: &AtomicUsize,
375        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
376        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
377        params: T::Params,
378    ) -> impl 'static + Future<Output = Result<T::Result>>
379    where
380        T::Result: 'static + Send,
381    {
382        let id = next_id.fetch_add(1, SeqCst);
383        let message = serde_json::to_vec(&Request {
384            jsonrpc: JSON_RPC_VERSION,
385            id,
386            method: T::METHOD,
387            params,
388        })
389        .unwrap();
390        let mut response_handlers = response_handlers.lock();
391        let (mut tx, mut rx) = oneshot::channel();
392        response_handlers.insert(
393            id,
394            Box::new(move |result| {
395                let response = match result {
396                    Ok(response) => {
397                        serde_json::from_str(response).context("failed to deserialize response")
398                    }
399                    Err(error) => Err(anyhow!("{}", error.message)),
400                };
401                let _ = tx.try_send(response);
402            }),
403        );
404
405        let send = outbound_tx
406            .as_ref()
407            .ok_or_else(|| {
408                anyhow!("tried to send a request to a language server that has been shut down")
409            })
410            .and_then(|outbound_tx| {
411                outbound_tx.try_send(message)?;
412                Ok(())
413            });
414        async move {
415            send?;
416            rx.recv().await.unwrap()
417        }
418    }
419
420    pub fn notify<T: notification::Notification>(
421        self: &Arc<Self>,
422        params: T::Params,
423    ) -> impl Future<Output = Result<()>> {
424        let this = self.clone();
425        async move {
426            this.initialized.clone().recv().await;
427            Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
428            Ok(())
429        }
430    }
431
432    fn notify_internal<T: notification::Notification>(
433        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
434        params: T::Params,
435    ) -> Result<()> {
436        let message = serde_json::to_vec(&Notification {
437            jsonrpc: JSON_RPC_VERSION,
438            method: T::METHOD,
439            params,
440        })
441        .unwrap();
442        let outbound_tx = outbound_tx
443            .as_ref()
444            .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
445        outbound_tx.try_send(message)?;
446        Ok(())
447    }
448}
449
450impl Drop for LanguageServer {
451    fn drop(&mut self) {
452        if let Some(shutdown) = self.shutdown() {
453            self.executor.spawn(shutdown).detach();
454        }
455    }
456}
457
458impl Subscription {
459    pub fn detach(mut self) {
460        self.method = "";
461    }
462}
463
464impl Drop for Subscription {
465    fn drop(&mut self) {
466        self.notification_handlers.write().remove(self.method);
467    }
468}
469
470#[cfg(any(test, feature = "test-support"))]
471pub struct FakeLanguageServer {
472    buffer: Vec<u8>,
473    stdin: smol::io::BufReader<async_pipe::PipeReader>,
474    stdout: smol::io::BufWriter<async_pipe::PipeWriter>,
475    executor: std::rc::Rc<executor::Foreground>,
476    pub started: Arc<std::sync::atomic::AtomicBool>,
477}
478
479#[cfg(any(test, feature = "test-support"))]
480pub struct RequestId<T> {
481    id: usize,
482    _type: std::marker::PhantomData<T>,
483}
484
485#[cfg(any(test, feature = "test-support"))]
486impl LanguageServer {
487    pub async fn fake(cx: &gpui::TestAppContext) -> (Arc<Self>, FakeLanguageServer) {
488        Self::fake_with_capabilities(Default::default(), cx).await
489    }
490
491    pub async fn fake_with_capabilities(
492        capabilities: ServerCapabilities,
493        cx: &gpui::TestAppContext,
494    ) -> (Arc<Self>, FakeLanguageServer) {
495        let stdin = async_pipe::pipe();
496        let stdout = async_pipe::pipe();
497        let mut fake = FakeLanguageServer {
498            stdin: smol::io::BufReader::new(stdin.1),
499            stdout: smol::io::BufWriter::new(stdout.0),
500            buffer: Vec::new(),
501            executor: cx.foreground(),
502            started: Arc::new(std::sync::atomic::AtomicBool::new(true)),
503        };
504
505        let server =
506            Self::new_internal(stdin.0, stdout.1, Path::new("/"), cx.background()).unwrap();
507
508        let (init_id, _) = fake.receive_request::<request::Initialize>().await;
509        fake.respond(
510            init_id,
511            InitializeResult {
512                capabilities,
513                ..Default::default()
514            },
515        )
516        .await;
517        fake.receive_notification::<notification::Initialized>()
518            .await;
519
520        (server, fake)
521    }
522}
523
524#[cfg(any(test, feature = "test-support"))]
525impl FakeLanguageServer {
526    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
527        if !self.started.load(std::sync::atomic::Ordering::SeqCst) {
528            panic!("can't simulate an LSP notification before the server has been started");
529        }
530        let message = serde_json::to_vec(&Notification {
531            jsonrpc: JSON_RPC_VERSION,
532            method: T::METHOD,
533            params,
534        })
535        .unwrap();
536        self.send(message).await;
537    }
538
539    pub async fn respond<'a, T: request::Request>(
540        &mut self,
541        request_id: RequestId<T>,
542        result: T::Result,
543    ) {
544        let result = serde_json::to_string(&result).unwrap();
545        let message = serde_json::to_vec(&AnyResponse {
546            id: request_id.id,
547            error: None,
548            result: Some(&RawValue::from_string(result).unwrap()),
549        })
550        .unwrap();
551        self.send(message).await;
552    }
553
554    pub async fn receive_request<T: request::Request>(&mut self) -> (RequestId<T>, T::Params) {
555        let executor = self.executor.clone();
556        executor.start_waiting();
557        loop {
558            self.receive().await;
559            if let Ok(request) = serde_json::from_slice::<Request<T::Params>>(&self.buffer) {
560                assert_eq!(request.method, T::METHOD);
561                assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
562                executor.finish_waiting();
563                return (
564                    RequestId {
565                        id: request.id,
566                        _type: std::marker::PhantomData,
567                    },
568                    request.params,
569                );
570            } else {
571                log::info!(
572                    "skipping message in fake language server {:?}",
573                    std::str::from_utf8(&self.buffer)
574                );
575            }
576        }
577    }
578
579    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
580        self.receive().await;
581        let notification = serde_json::from_slice::<Notification<T::Params>>(&self.buffer).unwrap();
582        assert_eq!(notification.method, T::METHOD);
583        notification.params
584    }
585
586    pub async fn start_progress(&mut self, token: impl Into<String>) {
587        self.notify::<notification::Progress>(ProgressParams {
588            token: NumberOrString::String(token.into()),
589            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
590        })
591        .await;
592    }
593
594    pub async fn end_progress(&mut self, token: impl Into<String>) {
595        self.notify::<notification::Progress>(ProgressParams {
596            token: NumberOrString::String(token.into()),
597            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
598        })
599        .await;
600    }
601
602    async fn send(&mut self, message: Vec<u8>) {
603        self.stdout
604            .write_all(CONTENT_LEN_HEADER.as_bytes())
605            .await
606            .unwrap();
607        self.stdout
608            .write_all((format!("{}", message.len())).as_bytes())
609            .await
610            .unwrap();
611        self.stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
612        self.stdout.write_all(&message).await.unwrap();
613        self.stdout.flush().await.unwrap();
614    }
615
616    async fn receive(&mut self) {
617        self.buffer.clear();
618        self.stdin
619            .read_until(b'\n', &mut self.buffer)
620            .await
621            .unwrap();
622        self.stdin
623            .read_until(b'\n', &mut self.buffer)
624            .await
625            .unwrap();
626        let message_len: usize = std::str::from_utf8(&self.buffer)
627            .unwrap()
628            .strip_prefix(CONTENT_LEN_HEADER)
629            .unwrap()
630            .trim_end()
631            .parse()
632            .unwrap();
633        self.buffer.resize(message_len, 0);
634        self.stdin.read_exact(&mut self.buffer).await.unwrap();
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use gpui::TestAppContext;
642    use simplelog::SimpleLogger;
643    use unindent::Unindent;
644    use util::test::temp_tree;
645
646    #[gpui::test]
647    async fn test_rust_analyzer(cx: TestAppContext) {
648        let lib_source = r#"
649            fn fun() {
650                let hello = "world";
651            }
652        "#
653        .unindent();
654        let root_dir = temp_tree(json!({
655            "Cargo.toml": r#"
656                [package]
657                name = "temp"
658                version = "0.1.0"
659                edition = "2018"
660            "#.unindent(),
661            "src": {
662                "lib.rs": &lib_source
663            }
664        }));
665        let lib_file_uri = Url::from_file_path(root_dir.path().join("src/lib.rs")).unwrap();
666
667        let server = cx.read(|cx| {
668            LanguageServer::new(
669                Path::new("rust-analyzer"),
670                root_dir.path(),
671                cx.background().clone(),
672            )
673            .unwrap()
674        });
675        server.next_idle_notification().await;
676
677        server
678            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
679                text_document: TextDocumentItem::new(
680                    lib_file_uri.clone(),
681                    "rust".to_string(),
682                    0,
683                    lib_source,
684                ),
685            })
686            .await
687            .unwrap();
688
689        let hover = server
690            .request::<request::HoverRequest>(HoverParams {
691                text_document_position_params: TextDocumentPositionParams {
692                    text_document: TextDocumentIdentifier::new(lib_file_uri),
693                    position: Position::new(1, 21),
694                },
695                work_done_progress_params: Default::default(),
696            })
697            .await
698            .unwrap()
699            .unwrap();
700        assert_eq!(
701            hover.contents,
702            HoverContents::Markup(MarkupContent {
703                kind: MarkupKind::PlainText,
704                value: "&str".to_string()
705            })
706        );
707    }
708
709    #[gpui::test]
710    async fn test_fake(cx: TestAppContext) {
711        SimpleLogger::init(log::LevelFilter::Info, Default::default()).unwrap();
712
713        let (server, mut fake) = LanguageServer::fake(&cx).await;
714
715        let (message_tx, message_rx) = channel::unbounded();
716        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
717        server
718            .on_notification::<notification::ShowMessage, _>(move |params| {
719                message_tx.try_send(params).unwrap()
720            })
721            .detach();
722        server
723            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
724                diagnostics_tx.try_send(params).unwrap()
725            })
726            .detach();
727
728        server
729            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
730                text_document: TextDocumentItem::new(
731                    Url::from_str("file://a/b").unwrap(),
732                    "rust".to_string(),
733                    0,
734                    "".to_string(),
735                ),
736            })
737            .await
738            .unwrap();
739        assert_eq!(
740            fake.receive_notification::<notification::DidOpenTextDocument>()
741                .await
742                .text_document
743                .uri
744                .as_str(),
745            "file://a/b"
746        );
747
748        fake.notify::<notification::ShowMessage>(ShowMessageParams {
749            typ: MessageType::ERROR,
750            message: "ok".to_string(),
751        })
752        .await;
753        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
754            uri: Url::from_str("file://b/c").unwrap(),
755            version: Some(5),
756            diagnostics: vec![],
757        })
758        .await;
759        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
760        assert_eq!(
761            diagnostics_rx.recv().await.unwrap().uri.as_str(),
762            "file://b/c"
763        );
764
765        drop(server);
766        let (shutdown_request, _) = fake.receive_request::<request::Shutdown>().await;
767        fake.respond(shutdown_request, ()).await;
768        fake.receive_notification::<notification::Exit>().await;
769    }
770
771    impl LanguageServer {
772        async fn next_idle_notification(self: &Arc<Self>) {
773            let (tx, rx) = channel::unbounded();
774            let _subscription =
775                self.on_notification::<ServerStatusNotification, _>(move |params| {
776                    if params.quiescent {
777                        tx.try_send(()).unwrap();
778                    }
779                });
780            let _ = rx.recv().await;
781        }
782    }
783
784    pub enum ServerStatusNotification {}
785
786    impl notification::Notification for ServerStatusNotification {
787        type Params = ServerStatusParams;
788        const METHOD: &'static str = "experimental/serverStatus";
789    }
790
791    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
792    pub struct ServerStatusParams {
793        pub quiescent: bool,
794    }
795}