lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream, watch};
  7use serde::{Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: channel::Sender<Vec<u8>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[cfg(any(test, feature = "test-support"))]
 60#[derive(Deserialize)]
 61struct AnyRequest<'a> {
 62    id: usize,
 63    #[serde(borrow)]
 64    jsonrpc: &'a str,
 65    #[serde(borrow)]
 66    method: &'a str,
 67    #[serde(borrow)]
 68    params: &'a RawValue,
 69}
 70
 71#[derive(Serialize, Deserialize)]
 72struct AnyResponse<'a> {
 73    id: usize,
 74    #[serde(default)]
 75    error: Option<Error>,
 76    #[serde(borrow)]
 77    result: Option<&'a RawValue>,
 78}
 79
 80#[derive(Serialize, Deserialize)]
 81struct Notification<'a, T> {
 82    #[serde(borrow)]
 83    jsonrpc: &'a str,
 84    #[serde(borrow)]
 85    method: &'a str,
 86    params: T,
 87}
 88
 89#[derive(Deserialize)]
 90struct AnyNotification<'a> {
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        binary_path: &Path,
105        root_path: &Path,
106        background: Arc<executor::Background>,
107    ) -> Result<Arc<Self>> {
108        let mut server = Command::new(binary_path)
109            .stdin(Stdio::piped())
110            .stdout(Stdio::piped())
111            .stderr(Stdio::inherit())
112            .spawn()?;
113        let stdin = server.stdin.take().unwrap();
114        let stdout = server.stdout.take().unwrap();
115        Self::new_internal(stdin, stdout, root_path, background)
116    }
117
118    fn new_internal<Stdin, Stdout>(
119        stdin: Stdin,
120        stdout: Stdout,
121        root_path: &Path,
122        executor: Arc<executor::Background>,
123    ) -> Result<Arc<Self>>
124    where
125        Stdin: AsyncWrite + Unpin + Send + 'static,
126        Stdout: AsyncRead + Unpin + Send + 'static,
127    {
128        let mut stdin = BufWriter::new(stdin);
129        let mut stdout = BufReader::new(stdout);
130        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131        let notification_handlers =
132            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
133        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
134        let input_task = executor.spawn(
135            {
136                let notification_handlers = notification_handlers.clone();
137                let response_handlers = response_handlers.clone();
138                async move {
139                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
140                    let mut buffer = Vec::new();
141                    loop {
142                        buffer.clear();
143                        stdout.read_until(b'\n', &mut buffer).await?;
144                        stdout.read_until(b'\n', &mut buffer).await?;
145                        let message_len: usize = std::str::from_utf8(&buffer)?
146                            .strip_prefix(CONTENT_LEN_HEADER)
147                            .ok_or_else(|| anyhow!("invalid header"))?
148                            .trim_end()
149                            .parse()?;
150
151                        buffer.resize(message_len, 0);
152                        stdout.read_exact(&mut buffer).await?;
153
154                        if let Ok(AnyNotification { method, params }) =
155                            serde_json::from_slice(&buffer)
156                        {
157                            if let Some(handler) = notification_handlers.write().get_mut(method) {
158                                handler(params.get());
159                            } else {
160                                log::info!(
161                                    "unhandled notification {}:\n{}",
162                                    method,
163                                    serde_json::to_string_pretty(
164                                        &Value::from_str(params.get()).unwrap()
165                                    )
166                                    .unwrap()
167                                );
168                            }
169                        } else if let Ok(AnyResponse { id, error, result }) =
170                            serde_json::from_slice(&buffer)
171                        {
172                            if let Some(handler) = response_handlers.lock().remove(&id) {
173                                if let Some(error) = error {
174                                    handler(Err(error));
175                                } else if let Some(result) = result {
176                                    handler(Ok(result.get()));
177                                } else {
178                                    handler(Ok("null"));
179                                }
180                            }
181                        } else {
182                            return Err(anyhow!(
183                                "failed to deserialize message:\n{}",
184                                std::str::from_utf8(&buffer)?
185                            ));
186                        }
187                    }
188                }
189            }
190            .log_err(),
191        );
192        let (output_done_tx, output_done_rx) = barrier::channel();
193        let output_task = executor.spawn({
194            let response_handlers = response_handlers.clone();
195            async move {
196                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
197                let mut content_len_buffer = Vec::new();
198                while let Ok(message) = outbound_rx.recv().await {
199                    content_len_buffer.clear();
200                    write!(content_len_buffer, "{}", message.len()).unwrap();
201                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
202                    stdin.write_all(&content_len_buffer).await?;
203                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
204                    stdin.write_all(&message).await?;
205                    stdin.flush().await?;
206                }
207                drop(output_done_tx);
208                Ok(())
209            }
210            .log_err()
211        });
212
213        let (initialized_tx, initialized_rx) = barrier::channel();
214        let (mut capabilities_tx, capabilities_rx) = watch::channel();
215        let this = Arc::new(Self {
216            notification_handlers,
217            response_handlers,
218            capabilities: capabilities_rx,
219            next_id: Default::default(),
220            outbound_tx,
221            executor: executor.clone(),
222            io_tasks: Mutex::new(Some((input_task, output_task))),
223            initialized: initialized_rx,
224            output_done_rx: Mutex::new(Some(output_done_rx)),
225        });
226
227        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
228        executor
229            .spawn({
230                let this = this.clone();
231                async move {
232                    if let Some(capabilities) = this.init(root_uri).log_err().await {
233                        *capabilities_tx.borrow_mut() = Some(capabilities);
234                    }
235
236                    drop(initialized_tx);
237                }
238            })
239            .detach();
240
241        Ok(this)
242    }
243
244    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
245        #[allow(deprecated)]
246        let params = InitializeParams {
247            process_id: Default::default(),
248            root_path: Default::default(),
249            root_uri: Some(root_uri),
250            initialization_options: Default::default(),
251            capabilities: ClientCapabilities {
252                text_document: Some(TextDocumentClientCapabilities {
253                    definition: Some(GotoCapability {
254                        link_support: Some(true),
255                        ..Default::default()
256                    }),
257                    code_action: Some(CodeActionClientCapabilities {
258                        code_action_literal_support: Some(CodeActionLiteralSupport {
259                            code_action_kind: CodeActionKindLiteralSupport {
260                                value_set: vec![
261                                    CodeActionKind::REFACTOR.as_str().into(),
262                                    CodeActionKind::QUICKFIX.as_str().into(),
263                                ],
264                            },
265                        }),
266                        data_support: Some(true),
267                        resolve_support: Some(CodeActionCapabilityResolveSupport {
268                            properties: vec!["edit".to_string()],
269                        }),
270                        ..Default::default()
271                    }),
272                    completion: Some(CompletionClientCapabilities {
273                        completion_item: Some(CompletionItemCapability {
274                            snippet_support: Some(true),
275                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
276                                properties: vec!["additionalTextEdits".to_string()],
277                            }),
278                            ..Default::default()
279                        }),
280                        ..Default::default()
281                    }),
282                    ..Default::default()
283                }),
284                experimental: Some(json!({
285                    "serverStatusNotification": true,
286                })),
287                window: Some(WindowClientCapabilities {
288                    work_done_progress: Some(true),
289                    ..Default::default()
290                }),
291                ..Default::default()
292            },
293            trace: Default::default(),
294            workspace_folders: Default::default(),
295            client_info: Default::default(),
296            locale: Default::default(),
297        };
298
299        let this = self.clone();
300        let request = Self::request_internal::<request::Initialize>(
301            &this.next_id,
302            &this.response_handlers,
303            &this.outbound_tx,
304            params,
305        );
306        let response = request.await?;
307        Self::notify_internal::<notification::Initialized>(
308            &this.outbound_tx,
309            InitializedParams {},
310        )?;
311        Ok(response.capabilities)
312    }
313
314    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
315        if let Some(tasks) = self.io_tasks.lock().take() {
316            let response_handlers = self.response_handlers.clone();
317            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
318            let outbound_tx = self.outbound_tx.clone();
319            let mut output_done = self.output_done_rx.lock().take().unwrap();
320            let shutdown_request = Self::request_internal::<request::Shutdown>(
321                &next_id,
322                &response_handlers,
323                &outbound_tx,
324                (),
325            );
326            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
327            outbound_tx.close();
328            Some(
329                async move {
330                    log::debug!("language server shutdown started");
331                    shutdown_request.await?;
332                    response_handlers.lock().clear();
333                    exit?;
334                    output_done.recv().await;
335                    log::debug!("language server shutdown finished");
336                    drop(tasks);
337                    Ok(())
338                }
339                .log_err(),
340            )
341        } else {
342            None
343        }
344    }
345
346    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
347    where
348        T: notification::Notification,
349        F: 'static + Send + Sync + FnMut(T::Params),
350    {
351        let prev_handler = self.notification_handlers.write().insert(
352            T::METHOD,
353            Box::new(
354                move |notification| match serde_json::from_str(notification) {
355                    Ok(notification) => f(notification),
356                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
357                },
358            ),
359        );
360
361        assert!(
362            prev_handler.is_none(),
363            "registered multiple handlers for the same notification"
364        );
365
366        Subscription {
367            method: T::METHOD,
368            notification_handlers: self.notification_handlers.clone(),
369        }
370    }
371
372    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
373        self.capabilities.clone()
374    }
375
376    pub fn request<T: request::Request>(
377        self: &Arc<Self>,
378        params: T::Params,
379    ) -> impl Future<Output = Result<T::Result>>
380    where
381        T::Result: 'static + Send,
382    {
383        let this = self.clone();
384        async move {
385            this.initialized.clone().recv().await;
386            Self::request_internal::<T>(
387                &this.next_id,
388                &this.response_handlers,
389                &this.outbound_tx,
390                params,
391            )
392            .await
393        }
394    }
395
396    fn request_internal<T: request::Request>(
397        next_id: &AtomicUsize,
398        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
399        outbound_tx: &channel::Sender<Vec<u8>>,
400        params: T::Params,
401    ) -> impl 'static + Future<Output = Result<T::Result>>
402    where
403        T::Result: 'static + Send,
404    {
405        let id = next_id.fetch_add(1, SeqCst);
406        let message = serde_json::to_vec(&Request {
407            jsonrpc: JSON_RPC_VERSION,
408            id,
409            method: T::METHOD,
410            params,
411        })
412        .unwrap();
413
414        let send = outbound_tx
415            .try_send(message)
416            .context("failed to write to language server's stdin");
417
418        let (tx, rx) = oneshot::channel();
419        response_handlers.lock().insert(
420            id,
421            Box::new(move |result| {
422                let response = match result {
423                    Ok(response) => {
424                        serde_json::from_str(response).context("failed to deserialize response")
425                    }
426                    Err(error) => Err(anyhow!("{}", error.message)),
427                };
428                let _ = tx.send(response);
429            }),
430        );
431
432        async move {
433            send?;
434            rx.await?
435        }
436    }
437
438    pub fn notify<T: notification::Notification>(
439        self: &Arc<Self>,
440        params: T::Params,
441    ) -> impl Future<Output = Result<()>> {
442        let this = self.clone();
443        async move {
444            this.initialized.clone().recv().await;
445            Self::notify_internal::<T>(&this.outbound_tx, params)?;
446            Ok(())
447        }
448    }
449
450    fn notify_internal<T: notification::Notification>(
451        outbound_tx: &channel::Sender<Vec<u8>>,
452        params: T::Params,
453    ) -> Result<()> {
454        let message = serde_json::to_vec(&Notification {
455            jsonrpc: JSON_RPC_VERSION,
456            method: T::METHOD,
457            params,
458        })
459        .unwrap();
460        outbound_tx.try_send(message)?;
461        Ok(())
462    }
463}
464
465impl Drop for LanguageServer {
466    fn drop(&mut self) {
467        if let Some(shutdown) = self.shutdown() {
468            self.executor.spawn(shutdown).detach();
469        }
470    }
471}
472
473impl Subscription {
474    pub fn detach(mut self) {
475        self.method = "";
476    }
477}
478
479impl Drop for Subscription {
480    fn drop(&mut self) {
481        self.notification_handlers.write().remove(self.method);
482    }
483}
484
485#[cfg(any(test, feature = "test-support"))]
486pub struct FakeLanguageServer {
487    handlers: FakeLanguageServerHandlers,
488    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
489    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
490    _input_task: Task<Result<()>>,
491    _output_task: Task<Result<()>>,
492}
493
494#[cfg(any(test, feature = "test-support"))]
495type FakeLanguageServerHandlers = Arc<
496    Mutex<
497        HashMap<
498            &'static str,
499            Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
500        >,
501    >,
502>;
503
504#[cfg(any(test, feature = "test-support"))]
505impl LanguageServer {
506    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
507        Self::fake_with_capabilities(Default::default(), cx)
508    }
509
510    pub fn fake_with_capabilities(
511        capabilities: ServerCapabilities,
512        cx: &mut gpui::MutableAppContext,
513    ) -> (Arc<Self>, FakeLanguageServer) {
514        let (stdin_writer, stdin_reader) = async_pipe::pipe();
515        let (stdout_writer, stdout_reader) = async_pipe::pipe();
516
517        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
518        fake.handle_request::<request::Initialize, _>({
519            let capabilities = capabilities.clone();
520            move |_, _| InitializeResult {
521                capabilities: capabilities.clone(),
522                ..Default::default()
523            }
524        });
525
526        let server = Self::new_internal(
527            stdin_writer,
528            stdout_reader,
529            Path::new("/"),
530            cx.background().clone(),
531        )
532        .unwrap();
533
534        (server, fake)
535    }
536}
537
538#[cfg(any(test, feature = "test-support"))]
539impl FakeLanguageServer {
540    fn new(
541        stdin: async_pipe::PipeReader,
542        stdout: async_pipe::PipeWriter,
543        cx: &mut gpui::MutableAppContext,
544    ) -> Self {
545        use futures::StreamExt as _;
546
547        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
548        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
549        let handlers = FakeLanguageServerHandlers::default();
550
551        let input_task = cx.spawn(|cx| {
552            let handlers = handlers.clone();
553            let outgoing_tx = outgoing_tx.clone();
554            async move {
555                let mut buffer = Vec::new();
556                let mut stdin = smol::io::BufReader::new(stdin);
557                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
558                    cx.background().simulate_random_delay().await;
559                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
560                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
561
562                        let response;
563                        if let Some(handler) = handlers.lock().get_mut(request.method) {
564                            response =
565                                handler(request.id, request.params.get().as_bytes(), cx.clone());
566                            log::debug!("handled lsp request. method:{}", request.method);
567                        } else {
568                            response = serde_json::to_vec(&AnyResponse {
569                                id: request.id,
570                                error: Some(Error {
571                                    message: "no handler".to_string(),
572                                }),
573                                result: None,
574                            })
575                            .unwrap();
576                            log::debug!("unhandled lsp request. method:{}", request.method);
577                        }
578                        outgoing_tx.unbounded_send(response)?;
579                    } else {
580                        incoming_tx.unbounded_send(buffer.clone())?;
581                    }
582                }
583                Ok::<_, anyhow::Error>(())
584            }
585        });
586
587        let output_task = cx.background().spawn(async move {
588            let mut stdout = smol::io::BufWriter::new(stdout);
589            while let Some(message) = outgoing_rx.next().await {
590                stdout
591                    .write_all(CONTENT_LEN_HEADER.as_bytes())
592                    .await
593                    .unwrap();
594                stdout
595                    .write_all((format!("{}", message.len())).as_bytes())
596                    .await
597                    .unwrap();
598                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
599                stdout.write_all(&message).await.unwrap();
600                stdout.flush().await.unwrap();
601            }
602            Ok(())
603        });
604
605        Self {
606            outgoing_tx,
607            incoming_rx,
608            handlers,
609            _input_task: input_task,
610            _output_task: output_task,
611        }
612    }
613
614    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
615        let message = serde_json::to_vec(&Notification {
616            jsonrpc: JSON_RPC_VERSION,
617            method: T::METHOD,
618            params,
619        })
620        .unwrap();
621        self.outgoing_tx.unbounded_send(message).unwrap();
622    }
623
624    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
625        use futures::StreamExt as _;
626
627        loop {
628            let bytes = self.incoming_rx.next().await.unwrap();
629            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
630                assert_eq!(notification.method, T::METHOD);
631                return notification.params;
632            } else {
633                log::info!(
634                    "skipping message in fake language server {:?}",
635                    std::str::from_utf8(&bytes)
636                );
637            }
638        }
639    }
640
641    pub fn handle_request<T, F>(
642        &mut self,
643        mut handler: F,
644    ) -> futures::channel::mpsc::UnboundedReceiver<()>
645    where
646        T: 'static + request::Request,
647        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
648    {
649        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
650        self.handlers.lock().insert(
651            T::METHOD,
652            Box::new(move |id, params, cx| {
653                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
654                let result = serde_json::to_string(&result).unwrap();
655                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
656                let response = AnyResponse {
657                    id,
658                    error: None,
659                    result: Some(result),
660                };
661                responded_tx.unbounded_send(()).ok();
662                serde_json::to_vec(&response).unwrap()
663            }),
664        );
665        responded_rx
666    }
667
668    pub fn remove_request_handler<T>(&mut self)
669    where
670        T: 'static + request::Request,
671    {
672        self.handlers.lock().remove(T::METHOD);
673    }
674
675    pub async fn start_progress(&mut self, token: impl Into<String>) {
676        self.notify::<notification::Progress>(ProgressParams {
677            token: NumberOrString::String(token.into()),
678            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
679        })
680        .await;
681    }
682
683    pub async fn end_progress(&mut self, token: impl Into<String>) {
684        self.notify::<notification::Progress>(ProgressParams {
685            token: NumberOrString::String(token.into()),
686            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
687        })
688        .await;
689    }
690
691    async fn receive(
692        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
693        buffer: &mut Vec<u8>,
694    ) -> Result<()> {
695        buffer.clear();
696        stdin.read_until(b'\n', buffer).await?;
697        stdin.read_until(b'\n', buffer).await?;
698        let message_len: usize = std::str::from_utf8(buffer)
699            .unwrap()
700            .strip_prefix(CONTENT_LEN_HEADER)
701            .ok_or_else(|| anyhow!("invalid content length header"))?
702            .trim_end()
703            .parse()
704            .unwrap();
705        buffer.resize(message_len, 0);
706        stdin.read_exact(buffer).await?;
707        Ok(())
708    }
709}
710
711struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
712
713impl Drop for ClearResponseHandlers {
714    fn drop(&mut self) {
715        self.0.lock().clear();
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722    use gpui::TestAppContext;
723
724    #[ctor::ctor]
725    fn init_logger() {
726        if std::env::var("RUST_LOG").is_ok() {
727            env_logger::init();
728        }
729    }
730
731    #[gpui::test]
732    async fn test_fake(cx: &mut TestAppContext) {
733        let (server, mut fake) = cx.update(LanguageServer::fake);
734
735        let (message_tx, message_rx) = channel::unbounded();
736        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
737        server
738            .on_notification::<notification::ShowMessage, _>(move |params| {
739                message_tx.try_send(params).unwrap()
740            })
741            .detach();
742        server
743            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
744                diagnostics_tx.try_send(params).unwrap()
745            })
746            .detach();
747
748        server
749            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
750                text_document: TextDocumentItem::new(
751                    Url::from_str("file://a/b").unwrap(),
752                    "rust".to_string(),
753                    0,
754                    "".to_string(),
755                ),
756            })
757            .await
758            .unwrap();
759        assert_eq!(
760            fake.receive_notification::<notification::DidOpenTextDocument>()
761                .await
762                .text_document
763                .uri
764                .as_str(),
765            "file://a/b"
766        );
767
768        fake.notify::<notification::ShowMessage>(ShowMessageParams {
769            typ: MessageType::ERROR,
770            message: "ok".to_string(),
771        })
772        .await;
773        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
774            uri: Url::from_str("file://b/c").unwrap(),
775            version: Some(5),
776            diagnostics: vec![],
777        })
778        .await;
779        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
780        assert_eq!(
781            diagnostics_rx.recv().await.unwrap().uri.as_str(),
782            "file://b/c"
783        );
784
785        fake.handle_request::<request::Shutdown, _>(|_, _| ());
786
787        drop(server);
788        fake.receive_notification::<notification::Exit>().await;
789    }
790
791    pub enum ServerStatusNotification {}
792
793    impl notification::Notification for ServerStatusNotification {
794        type Params = ServerStatusParams;
795        const METHOD: &'static str = "experimental/serverStatus";
796    }
797
798    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
799    pub struct ServerStatusParams {
800        pub quiescent: bool,
801    }
802}