lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  3use gpui::{executor, Task};
  4use parking_lot::{Mutex, RwLock};
  5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
  6use serde::{Deserialize, Serialize};
  7use serde_json::{json, value::RawValue, Value};
  8use smol::{
  9    channel,
 10    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 11    process::Command,
 12};
 13use std::{
 14    collections::HashMap,
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: channel::Sender<Vec<u8>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[cfg(any(test, feature = "test-support"))]
 60#[derive(Deserialize)]
 61struct AnyRequest<'a> {
 62    id: usize,
 63    #[serde(borrow)]
 64    jsonrpc: &'a str,
 65    #[serde(borrow)]
 66    method: &'a str,
 67    #[serde(borrow)]
 68    params: &'a RawValue,
 69}
 70
 71#[derive(Serialize, Deserialize)]
 72struct AnyResponse<'a> {
 73    id: usize,
 74    #[serde(default)]
 75    error: Option<Error>,
 76    #[serde(borrow)]
 77    result: Option<&'a RawValue>,
 78}
 79
 80#[derive(Serialize, Deserialize)]
 81struct Notification<'a, T> {
 82    #[serde(borrow)]
 83    jsonrpc: &'a str,
 84    #[serde(borrow)]
 85    method: &'a str,
 86    params: T,
 87}
 88
 89#[derive(Deserialize)]
 90struct AnyNotification<'a> {
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        binary_path: &Path,
105        root_path: &Path,
106        background: Arc<executor::Background>,
107    ) -> Result<Arc<Self>> {
108        let mut server = Command::new(binary_path)
109            .stdin(Stdio::piped())
110            .stdout(Stdio::piped())
111            .stderr(Stdio::inherit())
112            .spawn()?;
113        let stdin = server.stdin.take().unwrap();
114        let stdout = server.stdout.take().unwrap();
115        Self::new_internal(stdin, stdout, root_path, background)
116    }
117
118    fn new_internal<Stdin, Stdout>(
119        stdin: Stdin,
120        stdout: Stdout,
121        root_path: &Path,
122        executor: Arc<executor::Background>,
123    ) -> Result<Arc<Self>>
124    where
125        Stdin: AsyncWrite + Unpin + Send + 'static,
126        Stdout: AsyncRead + Unpin + Send + 'static,
127    {
128        let mut stdin = BufWriter::new(stdin);
129        let mut stdout = BufReader::new(stdout);
130        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
132        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
133        let input_task = executor.spawn(
134            {
135                let notification_handlers = notification_handlers.clone();
136                let response_handlers = response_handlers.clone();
137                async move {
138                    let mut buffer = Vec::new();
139                    loop {
140                        buffer.clear();
141                        stdout.read_until(b'\n', &mut buffer).await?;
142                        stdout.read_until(b'\n', &mut buffer).await?;
143                        let message_len: usize = std::str::from_utf8(&buffer)?
144                            .strip_prefix(CONTENT_LEN_HEADER)
145                            .ok_or_else(|| anyhow!("invalid header"))?
146                            .trim_end()
147                            .parse()?;
148
149                        buffer.resize(message_len, 0);
150                        stdout.read_exact(&mut buffer).await?;
151
152                        if let Ok(AnyNotification { method, params }) =
153                            serde_json::from_slice(&buffer)
154                        {
155                            if let Some(handler) = notification_handlers.write().get_mut(method) {
156                                handler(params.get());
157                            } else {
158                                log::info!(
159                                    "unhandled notification {}:\n{}",
160                                    method,
161                                    serde_json::to_string_pretty(
162                                        &Value::from_str(params.get()).unwrap()
163                                    )
164                                    .unwrap()
165                                );
166                            }
167                        } else if let Ok(AnyResponse { id, error, result }) =
168                            serde_json::from_slice(&buffer)
169                        {
170                            if let Some(handler) = response_handlers.lock().remove(&id) {
171                                if let Some(error) = error {
172                                    handler(Err(error));
173                                } else if let Some(result) = result {
174                                    handler(Ok(result.get()));
175                                } else {
176                                    handler(Ok("null"));
177                                }
178                            }
179                        } else {
180                            return Err(anyhow!(
181                                "failed to deserialize message:\n{}",
182                                std::str::from_utf8(&buffer)?
183                            ));
184                        }
185                    }
186                }
187            }
188            .log_err(),
189        );
190        let (output_done_tx, output_done_rx) = barrier::channel();
191        let output_task = executor.spawn(
192            async move {
193                let mut content_len_buffer = Vec::new();
194                while let Ok(message) = outbound_rx.recv().await {
195                    content_len_buffer.clear();
196                    write!(content_len_buffer, "{}", message.len()).unwrap();
197                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
198                    stdin.write_all(&content_len_buffer).await?;
199                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
200                    stdin.write_all(&message).await?;
201                    stdin.flush().await?;
202                }
203                drop(output_done_tx);
204                Ok(())
205            }
206            .log_err(),
207        );
208
209        let (initialized_tx, initialized_rx) = barrier::channel();
210        let (mut capabilities_tx, capabilities_rx) = watch::channel();
211        let this = Arc::new(Self {
212            notification_handlers,
213            response_handlers,
214            capabilities: capabilities_rx,
215            next_id: Default::default(),
216            outbound_tx,
217            executor: executor.clone(),
218            io_tasks: Mutex::new(Some((input_task, output_task))),
219            initialized: initialized_rx,
220            output_done_rx: Mutex::new(Some(output_done_rx)),
221        });
222
223        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
224        executor
225            .spawn({
226                let this = this.clone();
227                async move {
228                    if let Some(capabilities) = this.init(root_uri).log_err().await {
229                        *capabilities_tx.borrow_mut() = Some(capabilities);
230                    }
231
232                    drop(initialized_tx);
233                }
234            })
235            .detach();
236
237        Ok(this)
238    }
239
240    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
241        #[allow(deprecated)]
242        let params = InitializeParams {
243            process_id: Default::default(),
244            root_path: Default::default(),
245            root_uri: Some(root_uri),
246            initialization_options: Default::default(),
247            capabilities: ClientCapabilities {
248                text_document: Some(TextDocumentClientCapabilities {
249                    definition: Some(GotoCapability {
250                        link_support: Some(true),
251                        ..Default::default()
252                    }),
253                    code_action: Some(CodeActionClientCapabilities {
254                        code_action_literal_support: Some(CodeActionLiteralSupport {
255                            code_action_kind: CodeActionKindLiteralSupport {
256                                value_set: vec![
257                                    CodeActionKind::REFACTOR.as_str().into(),
258                                    CodeActionKind::QUICKFIX.as_str().into(),
259                                ],
260                            },
261                        }),
262                        data_support: Some(true),
263                        resolve_support: Some(CodeActionCapabilityResolveSupport {
264                            properties: vec!["edit".to_string()],
265                        }),
266                        ..Default::default()
267                    }),
268                    completion: Some(CompletionClientCapabilities {
269                        completion_item: Some(CompletionItemCapability {
270                            snippet_support: Some(true),
271                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
272                                properties: vec!["additionalTextEdits".to_string()],
273                            }),
274                            ..Default::default()
275                        }),
276                        ..Default::default()
277                    }),
278                    ..Default::default()
279                }),
280                experimental: Some(json!({
281                    "serverStatusNotification": true,
282                })),
283                window: Some(WindowClientCapabilities {
284                    work_done_progress: Some(true),
285                    ..Default::default()
286                }),
287                ..Default::default()
288            },
289            trace: Default::default(),
290            workspace_folders: Default::default(),
291            client_info: Default::default(),
292            locale: Default::default(),
293        };
294
295        let this = self.clone();
296        let request = Self::request_internal::<request::Initialize>(
297            &this.next_id,
298            &this.response_handlers,
299            &this.outbound_tx,
300            params,
301        );
302        let response = request.await?;
303        Self::notify_internal::<notification::Initialized>(
304            &this.outbound_tx,
305            InitializedParams {},
306        )?;
307        Ok(response.capabilities)
308    }
309
310    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
311        if let Some(tasks) = self.io_tasks.lock().take() {
312            let response_handlers = self.response_handlers.clone();
313            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
314            let outbound_tx = self.outbound_tx.clone();
315            let mut output_done = self.output_done_rx.lock().take().unwrap();
316            let shutdown_request = Self::request_internal::<request::Shutdown>(
317                &next_id,
318                &response_handlers,
319                &outbound_tx,
320                (),
321            );
322            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
323            outbound_tx.close();
324            Some(
325                async move {
326                    shutdown_request.await?;
327                    exit?;
328                    output_done.recv().await;
329                    drop(tasks);
330                    Ok(())
331                }
332                .log_err(),
333            )
334        } else {
335            None
336        }
337    }
338
339    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
340    where
341        T: notification::Notification,
342        F: 'static + Send + Sync + FnMut(T::Params),
343    {
344        let prev_handler = self.notification_handlers.write().insert(
345            T::METHOD,
346            Box::new(
347                move |notification| match serde_json::from_str(notification) {
348                    Ok(notification) => f(notification),
349                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
350                },
351            ),
352        );
353
354        assert!(
355            prev_handler.is_none(),
356            "registered multiple handlers for the same notification"
357        );
358
359        Subscription {
360            method: T::METHOD,
361            notification_handlers: self.notification_handlers.clone(),
362        }
363    }
364
365    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
366        self.capabilities.clone()
367    }
368
369    pub fn request<T: request::Request>(
370        self: &Arc<Self>,
371        params: T::Params,
372    ) -> impl Future<Output = Result<T::Result>>
373    where
374        T::Result: 'static + Send,
375    {
376        let this = self.clone();
377        async move {
378            this.initialized.clone().recv().await;
379            Self::request_internal::<T>(
380                &this.next_id,
381                &this.response_handlers,
382                &this.outbound_tx,
383                params,
384            )
385            .await
386        }
387    }
388
389    fn request_internal<T: request::Request>(
390        next_id: &AtomicUsize,
391        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
392        outbound_tx: &channel::Sender<Vec<u8>>,
393        params: T::Params,
394    ) -> impl 'static + Future<Output = Result<T::Result>>
395    where
396        T::Result: 'static + Send,
397    {
398        let id = next_id.fetch_add(1, SeqCst);
399        let message = serde_json::to_vec(&Request {
400            jsonrpc: JSON_RPC_VERSION,
401            id,
402            method: T::METHOD,
403            params,
404        })
405        .unwrap();
406        let mut response_handlers = response_handlers.lock();
407        let (mut tx, mut rx) = oneshot::channel();
408        response_handlers.insert(
409            id,
410            Box::new(move |result| {
411                let response = match result {
412                    Ok(response) => {
413                        serde_json::from_str(response).context("failed to deserialize response")
414                    }
415                    Err(error) => Err(anyhow!("{}", error.message)),
416                };
417                let _ = tx.try_send(response);
418            }),
419        );
420
421        let send = outbound_tx
422            .try_send(message)
423            .context("failed to write to language server's stdin");
424        async move {
425            send?;
426            rx.recv().await.unwrap()
427        }
428    }
429
430    pub fn notify<T: notification::Notification>(
431        self: &Arc<Self>,
432        params: T::Params,
433    ) -> impl Future<Output = Result<()>> {
434        let this = self.clone();
435        async move {
436            this.initialized.clone().recv().await;
437            Self::notify_internal::<T>(&this.outbound_tx, params)?;
438            Ok(())
439        }
440    }
441
442    fn notify_internal<T: notification::Notification>(
443        outbound_tx: &channel::Sender<Vec<u8>>,
444        params: T::Params,
445    ) -> Result<()> {
446        let message = serde_json::to_vec(&Notification {
447            jsonrpc: JSON_RPC_VERSION,
448            method: T::METHOD,
449            params,
450        })
451        .unwrap();
452        outbound_tx.try_send(message)?;
453        Ok(())
454    }
455}
456
457impl Drop for LanguageServer {
458    fn drop(&mut self) {
459        if let Some(shutdown) = self.shutdown() {
460            self.executor.spawn(shutdown).detach();
461        }
462    }
463}
464
465impl Subscription {
466    pub fn detach(mut self) {
467        self.method = "";
468    }
469}
470
471impl Drop for Subscription {
472    fn drop(&mut self) {
473        self.notification_handlers.write().remove(self.method);
474    }
475}
476
477#[cfg(any(test, feature = "test-support"))]
478pub struct FakeLanguageServer {
479    handlers: Arc<
480        Mutex<
481            HashMap<
482                &'static str,
483                Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
484            >,
485        >,
486    >,
487    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
488    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
489}
490
491#[cfg(any(test, feature = "test-support"))]
492impl LanguageServer {
493    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
494        Self::fake_with_capabilities(Default::default(), cx)
495    }
496
497    pub fn fake_with_capabilities(
498        capabilities: ServerCapabilities,
499        cx: &mut gpui::MutableAppContext,
500    ) -> (Arc<Self>, FakeLanguageServer) {
501        let (stdin_writer, stdin_reader) = async_pipe::pipe();
502        let (stdout_writer, stdout_reader) = async_pipe::pipe();
503
504        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
505        fake.handle_request::<request::Initialize, _>({
506            let capabilities = capabilities.clone();
507            move |_, _| InitializeResult {
508                capabilities: capabilities.clone(),
509                ..Default::default()
510            }
511        });
512
513        let server = Self::new_internal(
514            stdin_writer,
515            stdout_reader,
516            Path::new("/"),
517            cx.background().clone(),
518        )
519        .unwrap();
520
521        (server, fake)
522    }
523}
524
525#[cfg(any(test, feature = "test-support"))]
526impl FakeLanguageServer {
527    fn new(
528        stdin: async_pipe::PipeReader,
529        stdout: async_pipe::PipeWriter,
530        cx: &mut gpui::MutableAppContext,
531    ) -> Self {
532        use futures::StreamExt as _;
533
534        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
535        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
536        let this = Self {
537            outgoing_tx: outgoing_tx.clone(),
538            incoming_rx,
539            handlers: Default::default(),
540        };
541
542        // Receive incoming messages
543        let handlers = this.handlers.clone();
544        cx.spawn(|cx| async move {
545            let mut buffer = Vec::new();
546            let mut stdin = smol::io::BufReader::new(stdin);
547            while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
548                cx.background().simulate_random_delay().await;
549                if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
550                    assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
551
552                    if let Some(handler) = handlers.lock().get_mut(request.method) {
553                        let response =
554                            handler(request.id, request.params.get().as_bytes(), cx.clone());
555                        log::debug!("handled lsp request. method:{}", request.method);
556                        outgoing_tx.unbounded_send(response)?;
557                    } else {
558                        log::debug!("unhandled lsp request. method:{}", request.method);
559                        outgoing_tx.unbounded_send(
560                            serde_json::to_vec(&AnyResponse {
561                                id: request.id,
562                                error: Some(Error {
563                                    message: "no handler".to_string(),
564                                }),
565                                result: None,
566                            })
567                            .unwrap(),
568                        )?;
569                    }
570                } else {
571                    incoming_tx.unbounded_send(buffer.clone())?;
572                }
573            }
574            Ok::<_, anyhow::Error>(())
575        })
576        .detach();
577
578        // Send outgoing messages
579        cx.background()
580            .spawn(async move {
581                let mut stdout = smol::io::BufWriter::new(stdout);
582                while let Some(notification) = outgoing_rx.next().await {
583                    Self::send(&mut stdout, &notification).await;
584                }
585            })
586            .detach();
587
588        this
589    }
590
591    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
592        let message = serde_json::to_vec(&Notification {
593            jsonrpc: JSON_RPC_VERSION,
594            method: T::METHOD,
595            params,
596        })
597        .unwrap();
598        self.outgoing_tx.unbounded_send(message).unwrap();
599    }
600
601    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
602        use futures::StreamExt as _;
603
604        loop {
605            let bytes = self.incoming_rx.next().await.unwrap();
606            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
607                assert_eq!(notification.method, T::METHOD);
608                return notification.params;
609            } else {
610                log::info!(
611                    "skipping message in fake language server {:?}",
612                    std::str::from_utf8(&bytes)
613                );
614            }
615        }
616    }
617
618    pub fn handle_request<T, F>(
619        &mut self,
620        mut handler: F,
621    ) -> futures::channel::mpsc::UnboundedReceiver<()>
622    where
623        T: 'static + request::Request,
624        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
625    {
626        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
627        self.handlers.lock().insert(
628            T::METHOD,
629            Box::new(move |id, params, cx| {
630                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
631                let result = serde_json::to_string(&result).unwrap();
632                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
633                let response = AnyResponse {
634                    id,
635                    error: None,
636                    result: Some(result),
637                };
638                responded_tx.unbounded_send(()).ok();
639                serde_json::to_vec(&response).unwrap()
640            }),
641        );
642        responded_rx
643    }
644
645    pub fn remove_request_handler<T>(&mut self)
646    where
647        T: 'static + request::Request,
648    {
649        self.handlers.lock().remove(T::METHOD);
650    }
651
652    pub async fn start_progress(&mut self, token: impl Into<String>) {
653        self.notify::<notification::Progress>(ProgressParams {
654            token: NumberOrString::String(token.into()),
655            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
656        })
657        .await;
658    }
659
660    pub async fn end_progress(&mut self, token: impl Into<String>) {
661        self.notify::<notification::Progress>(ProgressParams {
662            token: NumberOrString::String(token.into()),
663            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
664        })
665        .await;
666    }
667
668    async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
669        stdout
670            .write_all(CONTENT_LEN_HEADER.as_bytes())
671            .await
672            .unwrap();
673        stdout
674            .write_all((format!("{}", message.len())).as_bytes())
675            .await
676            .unwrap();
677        stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
678        stdout.write_all(&message).await.unwrap();
679        stdout.flush().await.unwrap();
680    }
681
682    async fn receive(
683        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
684        buffer: &mut Vec<u8>,
685    ) -> Result<()> {
686        buffer.clear();
687        stdin.read_until(b'\n', buffer).await?;
688        stdin.read_until(b'\n', buffer).await?;
689        let message_len: usize = std::str::from_utf8(buffer)
690            .unwrap()
691            .strip_prefix(CONTENT_LEN_HEADER)
692            .unwrap()
693            .trim_end()
694            .parse()
695            .unwrap();
696        buffer.resize(message_len, 0);
697        stdin.read_exact(buffer).await?;
698        Ok(())
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use gpui::TestAppContext;
706
707    #[ctor::ctor]
708    fn init_logger() {
709        if std::env::var("RUST_LOG").is_ok() {
710            env_logger::init();
711        }
712    }
713
714    #[gpui::test]
715    async fn test_fake(mut cx: TestAppContext) {
716        let (server, mut fake) = cx.update(LanguageServer::fake);
717
718        let (message_tx, message_rx) = channel::unbounded();
719        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
720        server
721            .on_notification::<notification::ShowMessage, _>(move |params| {
722                message_tx.try_send(params).unwrap()
723            })
724            .detach();
725        server
726            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
727                diagnostics_tx.try_send(params).unwrap()
728            })
729            .detach();
730
731        server
732            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
733                text_document: TextDocumentItem::new(
734                    Url::from_str("file://a/b").unwrap(),
735                    "rust".to_string(),
736                    0,
737                    "".to_string(),
738                ),
739            })
740            .await
741            .unwrap();
742        assert_eq!(
743            fake.receive_notification::<notification::DidOpenTextDocument>()
744                .await
745                .text_document
746                .uri
747                .as_str(),
748            "file://a/b"
749        );
750
751        fake.notify::<notification::ShowMessage>(ShowMessageParams {
752            typ: MessageType::ERROR,
753            message: "ok".to_string(),
754        })
755        .await;
756        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
757            uri: Url::from_str("file://b/c").unwrap(),
758            version: Some(5),
759            diagnostics: vec![],
760        })
761        .await;
762        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
763        assert_eq!(
764            diagnostics_rx.recv().await.unwrap().uri.as_str(),
765            "file://b/c"
766        );
767
768        fake.handle_request::<request::Shutdown, _>(|_, _| ());
769
770        drop(server);
771        fake.receive_notification::<notification::Exit>().await;
772    }
773
774    pub enum ServerStatusNotification {}
775
776    impl notification::Notification for ServerStatusNotification {
777        type Params = ServerStatusParams;
778        const METHOD: &'static str = "experimental/serverStatus";
779    }
780
781    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
782    pub struct ServerStatusParams {
783        pub quiescent: bool,
784    }
785}