lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  3use gpui::{executor, Task};
  4use parking_lot::{Mutex, RwLock};
  5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
  6use serde::{Deserialize, Serialize};
  7use serde_json::{json, value::RawValue, Value};
  8use smol::{
  9    channel,
 10    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 11    process::Command,
 12};
 13use std::{
 14    collections::HashMap,
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[cfg(any(test, feature = "test-support"))]
 60#[derive(Deserialize)]
 61struct AnyRequest<'a> {
 62    id: usize,
 63    #[serde(borrow)]
 64    jsonrpc: &'a str,
 65    #[serde(borrow)]
 66    method: &'a str,
 67    #[serde(borrow)]
 68    params: &'a RawValue,
 69}
 70
 71#[derive(Serialize, Deserialize)]
 72struct AnyResponse<'a> {
 73    id: usize,
 74    #[serde(default)]
 75    error: Option<Error>,
 76    #[serde(borrow)]
 77    result: Option<&'a RawValue>,
 78}
 79
 80#[derive(Serialize, Deserialize)]
 81struct Notification<'a, T> {
 82    #[serde(borrow)]
 83    jsonrpc: &'a str,
 84    #[serde(borrow)]
 85    method: &'a str,
 86    params: T,
 87}
 88
 89#[derive(Deserialize)]
 90struct AnyNotification<'a> {
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        binary_path: &Path,
105        root_path: &Path,
106        background: Arc<executor::Background>,
107    ) -> Result<Arc<Self>> {
108        let mut server = Command::new(binary_path)
109            .stdin(Stdio::piped())
110            .stdout(Stdio::piped())
111            .stderr(Stdio::inherit())
112            .spawn()?;
113        let stdin = server.stdin.take().unwrap();
114        let stdout = server.stdout.take().unwrap();
115        Self::new_internal(stdin, stdout, root_path, background)
116    }
117
118    fn new_internal<Stdin, Stdout>(
119        stdin: Stdin,
120        stdout: Stdout,
121        root_path: &Path,
122        executor: Arc<executor::Background>,
123    ) -> Result<Arc<Self>>
124    where
125        Stdin: AsyncWrite + Unpin + Send + 'static,
126        Stdout: AsyncRead + Unpin + Send + 'static,
127    {
128        let mut stdin = BufWriter::new(stdin);
129        let mut stdout = BufReader::new(stdout);
130        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
132        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
133        let input_task = executor.spawn(
134            {
135                let notification_handlers = notification_handlers.clone();
136                let response_handlers = response_handlers.clone();
137                async move {
138                    let mut buffer = Vec::new();
139                    loop {
140                        buffer.clear();
141                        stdout.read_until(b'\n', &mut buffer).await?;
142                        stdout.read_until(b'\n', &mut buffer).await?;
143                        let message_len: usize = std::str::from_utf8(&buffer)?
144                            .strip_prefix(CONTENT_LEN_HEADER)
145                            .ok_or_else(|| anyhow!("invalid header"))?
146                            .trim_end()
147                            .parse()?;
148
149                        buffer.resize(message_len, 0);
150                        stdout.read_exact(&mut buffer).await?;
151
152                        if let Ok(AnyNotification { method, params }) =
153                            serde_json::from_slice(&buffer)
154                        {
155                            if let Some(handler) = notification_handlers.write().get_mut(method) {
156                                handler(params.get());
157                            } else {
158                                log::info!(
159                                    "unhandled notification {}:\n{}",
160                                    method,
161                                    serde_json::to_string_pretty(
162                                        &Value::from_str(params.get()).unwrap()
163                                    )
164                                    .unwrap()
165                                );
166                            }
167                        } else if let Ok(AnyResponse { id, error, result }) =
168                            serde_json::from_slice(&buffer)
169                        {
170                            if let Some(handler) = response_handlers.lock().remove(&id) {
171                                if let Some(error) = error {
172                                    handler(Err(error));
173                                } else if let Some(result) = result {
174                                    handler(Ok(result.get()));
175                                } else {
176                                    handler(Ok("null"));
177                                }
178                            }
179                        } else {
180                            return Err(anyhow!(
181                                "failed to deserialize message:\n{}",
182                                std::str::from_utf8(&buffer)?
183                            ));
184                        }
185                    }
186                }
187            }
188            .log_err(),
189        );
190        let (output_done_tx, output_done_rx) = barrier::channel();
191        let output_task = executor.spawn(
192            async move {
193                let mut content_len_buffer = Vec::new();
194                while let Ok(message) = outbound_rx.recv().await {
195                    content_len_buffer.clear();
196                    write!(content_len_buffer, "{}", message.len()).unwrap();
197                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
198                    stdin.write_all(&content_len_buffer).await?;
199                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
200                    stdin.write_all(&message).await?;
201                    stdin.flush().await?;
202                }
203                drop(output_done_tx);
204                Ok(())
205            }
206            .log_err(),
207        );
208
209        let (initialized_tx, initialized_rx) = barrier::channel();
210        let (mut capabilities_tx, capabilities_rx) = watch::channel();
211        let this = Arc::new(Self {
212            notification_handlers,
213            response_handlers,
214            capabilities: capabilities_rx,
215            next_id: Default::default(),
216            outbound_tx: RwLock::new(Some(outbound_tx)),
217            executor: executor.clone(),
218            io_tasks: Mutex::new(Some((input_task, output_task))),
219            initialized: initialized_rx,
220            output_done_rx: Mutex::new(Some(output_done_rx)),
221        });
222
223        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
224        executor
225            .spawn({
226                let this = this.clone();
227                async move {
228                    if let Some(capabilities) = this.init(root_uri).log_err().await {
229                        *capabilities_tx.borrow_mut() = Some(capabilities);
230                    }
231
232                    drop(initialized_tx);
233                }
234            })
235            .detach();
236
237        Ok(this)
238    }
239
240    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
241        #[allow(deprecated)]
242        let params = InitializeParams {
243            process_id: Default::default(),
244            root_path: Default::default(),
245            root_uri: Some(root_uri),
246            initialization_options: Default::default(),
247            capabilities: ClientCapabilities {
248                text_document: Some(TextDocumentClientCapabilities {
249                    definition: Some(GotoCapability {
250                        link_support: Some(true),
251                        ..Default::default()
252                    }),
253                    code_action: Some(CodeActionClientCapabilities {
254                        code_action_literal_support: Some(CodeActionLiteralSupport {
255                            code_action_kind: CodeActionKindLiteralSupport {
256                                value_set: vec![
257                                    CodeActionKind::REFACTOR.as_str().into(),
258                                    CodeActionKind::QUICKFIX.as_str().into(),
259                                ],
260                            },
261                        }),
262                        data_support: Some(true),
263                        resolve_support: Some(CodeActionCapabilityResolveSupport {
264                            properties: vec!["edit".to_string()],
265                        }),
266                        ..Default::default()
267                    }),
268                    completion: Some(CompletionClientCapabilities {
269                        completion_item: Some(CompletionItemCapability {
270                            snippet_support: Some(true),
271                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
272                                properties: vec!["additionalTextEdits".to_string()],
273                            }),
274                            ..Default::default()
275                        }),
276                        ..Default::default()
277                    }),
278                    ..Default::default()
279                }),
280                experimental: Some(json!({
281                    "serverStatusNotification": true,
282                })),
283                window: Some(WindowClientCapabilities {
284                    work_done_progress: Some(true),
285                    ..Default::default()
286                }),
287                ..Default::default()
288            },
289            trace: Default::default(),
290            workspace_folders: Default::default(),
291            client_info: Default::default(),
292            locale: Default::default(),
293        };
294
295        let this = self.clone();
296        let request = Self::request_internal::<request::Initialize>(
297            &this.next_id,
298            &this.response_handlers,
299            this.outbound_tx.read().as_ref(),
300            params,
301        );
302        let response = request.await?;
303        Self::notify_internal::<notification::Initialized>(
304            this.outbound_tx.read().as_ref(),
305            InitializedParams {},
306        )?;
307        Ok(response.capabilities)
308    }
309
310    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
311        if let Some(tasks) = self.io_tasks.lock().take() {
312            let response_handlers = self.response_handlers.clone();
313            let outbound_tx = self.outbound_tx.write().take();
314            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
315            let mut output_done = self.output_done_rx.lock().take().unwrap();
316            Some(async move {
317                Self::request_internal::<request::Shutdown>(
318                    &next_id,
319                    &response_handlers,
320                    outbound_tx.as_ref(),
321                    (),
322                )
323                .await?;
324                Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
325                drop(outbound_tx);
326                output_done.recv().await;
327                drop(tasks);
328                Ok(())
329            })
330        } else {
331            None
332        }
333    }
334
335    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
336    where
337        T: notification::Notification,
338        F: 'static + Send + Sync + FnMut(T::Params),
339    {
340        let prev_handler = self.notification_handlers.write().insert(
341            T::METHOD,
342            Box::new(
343                move |notification| match serde_json::from_str(notification) {
344                    Ok(notification) => f(notification),
345                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
346                },
347            ),
348        );
349
350        assert!(
351            prev_handler.is_none(),
352            "registered multiple handlers for the same notification"
353        );
354
355        Subscription {
356            method: T::METHOD,
357            notification_handlers: self.notification_handlers.clone(),
358        }
359    }
360
361    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
362        self.capabilities.clone()
363    }
364
365    pub fn request<T: request::Request>(
366        self: &Arc<Self>,
367        params: T::Params,
368    ) -> impl Future<Output = Result<T::Result>>
369    where
370        T::Result: 'static + Send,
371    {
372        let this = self.clone();
373        async move {
374            this.initialized.clone().recv().await;
375            Self::request_internal::<T>(
376                &this.next_id,
377                &this.response_handlers,
378                this.outbound_tx.read().as_ref(),
379                params,
380            )
381            .await
382        }
383    }
384
385    fn request_internal<T: request::Request>(
386        next_id: &AtomicUsize,
387        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
388        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
389        params: T::Params,
390    ) -> impl 'static + Future<Output = Result<T::Result>>
391    where
392        T::Result: 'static + Send,
393    {
394        let id = next_id.fetch_add(1, SeqCst);
395        let message = serde_json::to_vec(&Request {
396            jsonrpc: JSON_RPC_VERSION,
397            id,
398            method: T::METHOD,
399            params,
400        })
401        .unwrap();
402        let mut response_handlers = response_handlers.lock();
403        let (mut tx, mut rx) = oneshot::channel();
404        response_handlers.insert(
405            id,
406            Box::new(move |result| {
407                let response = match result {
408                    Ok(response) => {
409                        serde_json::from_str(response).context("failed to deserialize response")
410                    }
411                    Err(error) => Err(anyhow!("{}", error.message)),
412                };
413                let _ = tx.try_send(response);
414            }),
415        );
416
417        let send = outbound_tx
418            .as_ref()
419            .ok_or_else(|| {
420                anyhow!("tried to send a request to a language server that has been shut down")
421            })
422            .and_then(|outbound_tx| {
423                outbound_tx
424                    .try_send(message)
425                    .context("failed to write to language server's stdin")?;
426                Ok(())
427            });
428        async move {
429            send?;
430            rx.recv().await.unwrap()
431        }
432    }
433
434    pub fn notify<T: notification::Notification>(
435        self: &Arc<Self>,
436        params: T::Params,
437    ) -> impl Future<Output = Result<()>> {
438        let this = self.clone();
439        async move {
440            this.initialized.clone().recv().await;
441            Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
442            Ok(())
443        }
444    }
445
446    fn notify_internal<T: notification::Notification>(
447        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
448        params: T::Params,
449    ) -> Result<()> {
450        let message = serde_json::to_vec(&Notification {
451            jsonrpc: JSON_RPC_VERSION,
452            method: T::METHOD,
453            params,
454        })
455        .unwrap();
456        let outbound_tx = outbound_tx
457            .as_ref()
458            .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
459        outbound_tx.try_send(message)?;
460        Ok(())
461    }
462}
463
464impl Drop for LanguageServer {
465    fn drop(&mut self) {
466        if let Some(shutdown) = self.shutdown() {
467            self.executor.spawn(shutdown).detach();
468        }
469    }
470}
471
472impl Subscription {
473    pub fn detach(mut self) {
474        self.method = "";
475    }
476}
477
478impl Drop for Subscription {
479    fn drop(&mut self) {
480        self.notification_handlers.write().remove(self.method);
481    }
482}
483
484#[cfg(any(test, feature = "test-support"))]
485pub struct FakeLanguageServer {
486    handlers: Arc<
487        Mutex<
488            HashMap<
489                &'static str,
490                Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
491            >,
492        >,
493    >,
494    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
495    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
496}
497
498#[cfg(any(test, feature = "test-support"))]
499impl LanguageServer {
500    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
501        Self::fake_with_capabilities(Default::default(), cx)
502    }
503
504    pub fn fake_with_capabilities(
505        capabilities: ServerCapabilities,
506        cx: &mut gpui::MutableAppContext,
507    ) -> (Arc<Self>, FakeLanguageServer) {
508        let (stdin_writer, stdin_reader) = async_pipe::pipe();
509        let (stdout_writer, stdout_reader) = async_pipe::pipe();
510
511        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
512        fake.handle_request::<request::Initialize, _>({
513            let capabilities = capabilities.clone();
514            move |_, _| InitializeResult {
515                capabilities: capabilities.clone(),
516                ..Default::default()
517            }
518        });
519
520        let server = Self::new_internal(
521            stdin_writer,
522            stdout_reader,
523            Path::new("/"),
524            cx.background().clone(),
525        )
526        .unwrap();
527
528        (server, fake)
529    }
530}
531
532#[cfg(any(test, feature = "test-support"))]
533impl FakeLanguageServer {
534    fn new(
535        stdin: async_pipe::PipeReader,
536        stdout: async_pipe::PipeWriter,
537        cx: &mut gpui::MutableAppContext,
538    ) -> Self {
539        use futures::StreamExt as _;
540
541        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
542        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
543        let this = Self {
544            outgoing_tx: outgoing_tx.clone(),
545            incoming_rx,
546            handlers: Default::default(),
547        };
548
549        // Receive incoming messages
550        let handlers = this.handlers.clone();
551        cx.spawn(|cx| async move {
552            let mut buffer = Vec::new();
553            let mut stdin = smol::io::BufReader::new(stdin);
554            while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
555                cx.background().simulate_random_delay().await;
556                if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
557                    assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
558
559                    if let Some(handler) = handlers.lock().get_mut(request.method) {
560                        let response =
561                            handler(request.id, request.params.get().as_bytes(), cx.clone());
562                        log::debug!("handled lsp request. method:{}", request.method);
563                        outgoing_tx.unbounded_send(response)?;
564                    } else {
565                        log::debug!("unhandled lsp request. method:{}", request.method);
566                        outgoing_tx.unbounded_send(
567                            serde_json::to_vec(&AnyResponse {
568                                id: request.id,
569                                error: Some(Error {
570                                    message: "no handler".to_string(),
571                                }),
572                                result: None,
573                            })
574                            .unwrap(),
575                        )?;
576                    }
577                } else {
578                    incoming_tx.unbounded_send(buffer.clone())?;
579                }
580            }
581            Ok::<_, anyhow::Error>(())
582        })
583        .detach();
584
585        // Send outgoing messages
586        cx.background()
587            .spawn(async move {
588                let mut stdout = smol::io::BufWriter::new(stdout);
589                while let Some(notification) = outgoing_rx.next().await {
590                    Self::send(&mut stdout, &notification).await;
591                }
592            })
593            .detach();
594
595        this
596    }
597
598    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
599        let message = serde_json::to_vec(&Notification {
600            jsonrpc: JSON_RPC_VERSION,
601            method: T::METHOD,
602            params,
603        })
604        .unwrap();
605        self.outgoing_tx.unbounded_send(message).unwrap();
606    }
607
608    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
609        use futures::StreamExt as _;
610
611        loop {
612            let bytes = self.incoming_rx.next().await.unwrap();
613            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
614                assert_eq!(notification.method, T::METHOD);
615                return notification.params;
616            } else {
617                log::info!(
618                    "skipping message in fake language server {:?}",
619                    std::str::from_utf8(&bytes)
620                );
621            }
622        }
623    }
624
625    pub fn handle_request<T, F>(
626        &mut self,
627        mut handler: F,
628    ) -> futures::channel::mpsc::UnboundedReceiver<()>
629    where
630        T: 'static + request::Request,
631        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
632    {
633        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
634        self.handlers.lock().insert(
635            T::METHOD,
636            Box::new(move |id, params, cx| {
637                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
638                let result = serde_json::to_string(&result).unwrap();
639                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
640                let response = AnyResponse {
641                    id,
642                    error: None,
643                    result: Some(result),
644                };
645                responded_tx.unbounded_send(()).ok();
646                serde_json::to_vec(&response).unwrap()
647            }),
648        );
649        responded_rx
650    }
651
652    pub fn remove_request_handler<T>(&mut self)
653    where
654        T: 'static + request::Request,
655    {
656        self.handlers.lock().remove(T::METHOD);
657    }
658
659    pub async fn start_progress(&mut self, token: impl Into<String>) {
660        self.notify::<notification::Progress>(ProgressParams {
661            token: NumberOrString::String(token.into()),
662            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
663        })
664        .await;
665    }
666
667    pub async fn end_progress(&mut self, token: impl Into<String>) {
668        self.notify::<notification::Progress>(ProgressParams {
669            token: NumberOrString::String(token.into()),
670            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
671        })
672        .await;
673    }
674
675    async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
676        stdout
677            .write_all(CONTENT_LEN_HEADER.as_bytes())
678            .await
679            .unwrap();
680        stdout
681            .write_all((format!("{}", message.len())).as_bytes())
682            .await
683            .unwrap();
684        stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
685        stdout.write_all(&message).await.unwrap();
686        stdout.flush().await.unwrap();
687    }
688
689    async fn receive(
690        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
691        buffer: &mut Vec<u8>,
692    ) -> Result<()> {
693        buffer.clear();
694        stdin.read_until(b'\n', buffer).await?;
695        stdin.read_until(b'\n', buffer).await?;
696        let message_len: usize = std::str::from_utf8(buffer)
697            .unwrap()
698            .strip_prefix(CONTENT_LEN_HEADER)
699            .unwrap()
700            .trim_end()
701            .parse()
702            .unwrap();
703        buffer.resize(message_len, 0);
704        stdin.read_exact(buffer).await?;
705        Ok(())
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use gpui::TestAppContext;
713
714    #[ctor::ctor]
715    fn init_logger() {
716        if std::env::var("RUST_LOG").is_ok() {
717            env_logger::init();
718        }
719    }
720
721    #[gpui::test]
722    async fn test_fake(mut cx: TestAppContext) {
723        let (server, mut fake) = cx.update(LanguageServer::fake);
724
725        let (message_tx, message_rx) = channel::unbounded();
726        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
727        server
728            .on_notification::<notification::ShowMessage, _>(move |params| {
729                message_tx.try_send(params).unwrap()
730            })
731            .detach();
732        server
733            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
734                diagnostics_tx.try_send(params).unwrap()
735            })
736            .detach();
737
738        server
739            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
740                text_document: TextDocumentItem::new(
741                    Url::from_str("file://a/b").unwrap(),
742                    "rust".to_string(),
743                    0,
744                    "".to_string(),
745                ),
746            })
747            .await
748            .unwrap();
749        assert_eq!(
750            fake.receive_notification::<notification::DidOpenTextDocument>()
751                .await
752                .text_document
753                .uri
754                .as_str(),
755            "file://a/b"
756        );
757
758        fake.notify::<notification::ShowMessage>(ShowMessageParams {
759            typ: MessageType::ERROR,
760            message: "ok".to_string(),
761        })
762        .await;
763        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
764            uri: Url::from_str("file://b/c").unwrap(),
765            version: Some(5),
766            diagnostics: vec![],
767        })
768        .await;
769        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
770        assert_eq!(
771            diagnostics_rx.recv().await.unwrap().uri.as_str(),
772            "file://b/c"
773        );
774
775        fake.handle_request::<request::Shutdown, _>(|_, _| ());
776
777        drop(server);
778        fake.receive_notification::<notification::Exit>().await;
779    }
780
781    pub enum ServerStatusNotification {}
782
783    impl notification::Notification for ServerStatusNotification {
784        type Params = ServerStatusParams;
785        const METHOD: &'static str = "experimental/serverStatus";
786    }
787
788    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
789    pub struct ServerStatusParams {
790        pub quiescent: bool,
791    }
792}