lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream};
  7use serde::{Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    path::PathBuf,
 18    str::FromStr,
 19    sync::{
 20        atomic::{AtomicUsize, Ordering::SeqCst},
 21        Arc,
 22    },
 23};
 24use std::{path::Path, process::Stdio};
 25use util::TryFutureExt;
 26
 27pub use lsp_types::*;
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 33type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 34
 35pub struct LanguageServer {
 36    next_id: AtomicUsize,
 37    outbound_tx: channel::Sender<Vec<u8>>,
 38    capabilities: ServerCapabilities,
 39    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 40    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 41    executor: Arc<executor::Background>,
 42    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44    root_path: PathBuf,
 45    options: Option<Value>,
 46}
 47
 48pub struct Subscription {
 49    method: &'static str,
 50    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 51}
 52
 53#[derive(Serialize, Deserialize)]
 54struct Request<'a, T> {
 55    jsonrpc: &'a str,
 56    id: usize,
 57    method: &'a str,
 58    params: T,
 59}
 60
 61#[cfg(any(test, feature = "test-support"))]
 62#[derive(Deserialize)]
 63struct AnyRequest<'a> {
 64    id: usize,
 65    #[serde(borrow)]
 66    jsonrpc: &'a str,
 67    #[serde(borrow)]
 68    method: &'a str,
 69    #[serde(borrow)]
 70    params: &'a RawValue,
 71}
 72
 73#[derive(Serialize, Deserialize)]
 74struct AnyResponse<'a> {
 75    id: usize,
 76    #[serde(default)]
 77    error: Option<Error>,
 78    #[serde(borrow)]
 79    result: Option<&'a RawValue>,
 80}
 81
 82#[derive(Serialize, Deserialize)]
 83struct Notification<'a, T> {
 84    #[serde(borrow)]
 85    jsonrpc: &'a str,
 86    #[serde(borrow)]
 87    method: &'a str,
 88    params: T,
 89}
 90
 91#[derive(Deserialize)]
 92struct AnyNotification<'a> {
 93    #[serde(borrow)]
 94    method: &'a str,
 95    #[serde(borrow)]
 96    params: &'a RawValue,
 97}
 98
 99#[derive(Debug, Serialize, Deserialize)]
100struct Error {
101    message: String,
102}
103
104impl LanguageServer {
105    pub fn new(
106        binary_path: &Path,
107        args: &[&str],
108        root_path: &Path,
109        options: Option<Value>,
110        background: Arc<executor::Background>,
111    ) -> Result<Self> {
112        let mut server = Command::new(binary_path)
113            .current_dir(root_path)
114            .args(args)
115            .stdin(Stdio::piped())
116            .stdout(Stdio::piped())
117            .stderr(Stdio::inherit())
118            .spawn()?;
119        let stdin = server.stdin.take().unwrap();
120        let stdout = server.stdout.take().unwrap();
121        Ok(Self::new_internal(
122            stdin, stdout, root_path, options, background,
123        ))
124    }
125
126    fn new_internal<Stdin, Stdout>(
127        stdin: Stdin,
128        stdout: Stdout,
129        root_path: &Path,
130        options: Option<Value>,
131        executor: Arc<executor::Background>,
132    ) -> Self
133    where
134        Stdin: AsyncWrite + Unpin + Send + 'static,
135        Stdout: AsyncRead + Unpin + Send + 'static,
136    {
137        let mut stdin = BufWriter::new(stdin);
138        let mut stdout = BufReader::new(stdout);
139        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
140        let notification_handlers =
141            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
142        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
143        let input_task = executor.spawn(
144            {
145                let notification_handlers = notification_handlers.clone();
146                let response_handlers = response_handlers.clone();
147                async move {
148                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
149                    let mut buffer = Vec::new();
150                    loop {
151                        buffer.clear();
152                        stdout.read_until(b'\n', &mut buffer).await?;
153                        stdout.read_until(b'\n', &mut buffer).await?;
154                        let message_len: usize = std::str::from_utf8(&buffer)?
155                            .strip_prefix(CONTENT_LEN_HEADER)
156                            .ok_or_else(|| anyhow!("invalid header"))?
157                            .trim_end()
158                            .parse()?;
159
160                        buffer.resize(message_len, 0);
161                        stdout.read_exact(&mut buffer).await?;
162
163                        if let Ok(AnyNotification { method, params }) =
164                            serde_json::from_slice(&buffer)
165                        {
166                            if let Some(handler) = notification_handlers.write().get_mut(method) {
167                                handler(params.get());
168                            } else {
169                                log::info!(
170                                    "unhandled notification {}:\n{}",
171                                    method,
172                                    serde_json::to_string_pretty(
173                                        &Value::from_str(params.get()).unwrap()
174                                    )
175                                    .unwrap()
176                                );
177                            }
178                        } else if let Ok(AnyResponse { id, error, result }) =
179                            serde_json::from_slice(&buffer)
180                        {
181                            if let Some(handler) = response_handlers.lock().remove(&id) {
182                                if let Some(error) = error {
183                                    handler(Err(error));
184                                } else if let Some(result) = result {
185                                    handler(Ok(result.get()));
186                                } else {
187                                    handler(Ok("null"));
188                                }
189                            }
190                        } else {
191                            return Err(anyhow!(
192                                "failed to deserialize message:\n{}",
193                                std::str::from_utf8(&buffer)?
194                            ));
195                        }
196                    }
197                }
198            }
199            .log_err(),
200        );
201        let (output_done_tx, output_done_rx) = barrier::channel();
202        let output_task = executor.spawn({
203            let response_handlers = response_handlers.clone();
204            async move {
205                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
206                let mut content_len_buffer = Vec::new();
207                while let Ok(message) = outbound_rx.recv().await {
208                    content_len_buffer.clear();
209                    write!(content_len_buffer, "{}", message.len()).unwrap();
210                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
211                    stdin.write_all(&content_len_buffer).await?;
212                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
213                    stdin.write_all(&message).await?;
214                    stdin.flush().await?;
215                }
216                drop(output_done_tx);
217                Ok(())
218            }
219            .log_err()
220        });
221
222        Self {
223            notification_handlers,
224            response_handlers,
225            capabilities: Default::default(),
226            next_id: Default::default(),
227            outbound_tx,
228            executor: executor.clone(),
229            io_tasks: Mutex::new(Some((input_task, output_task))),
230            output_done_rx: Mutex::new(Some(output_done_rx)),
231            root_path: root_path.to_path_buf(),
232            options,
233        }
234    }
235
236    pub async fn initialize(mut self) -> Result<Arc<Self>> {
237        let options = self.options.take();
238        let mut this = Arc::new(self);
239        let root_uri = Url::from_file_path(&this.root_path).unwrap();
240        #[allow(deprecated)]
241        let params = InitializeParams {
242            process_id: Default::default(),
243            root_path: Default::default(),
244            root_uri: Some(root_uri),
245            initialization_options: options,
246            capabilities: ClientCapabilities {
247                text_document: Some(TextDocumentClientCapabilities {
248                    definition: Some(GotoCapability {
249                        link_support: Some(true),
250                        ..Default::default()
251                    }),
252                    code_action: Some(CodeActionClientCapabilities {
253                        code_action_literal_support: Some(CodeActionLiteralSupport {
254                            code_action_kind: CodeActionKindLiteralSupport {
255                                value_set: vec![
256                                    CodeActionKind::REFACTOR.as_str().into(),
257                                    CodeActionKind::QUICKFIX.as_str().into(),
258                                ],
259                            },
260                        }),
261                        data_support: Some(true),
262                        resolve_support: Some(CodeActionCapabilityResolveSupport {
263                            properties: vec!["edit".to_string()],
264                        }),
265                        ..Default::default()
266                    }),
267                    completion: Some(CompletionClientCapabilities {
268                        completion_item: Some(CompletionItemCapability {
269                            snippet_support: Some(true),
270                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
271                                properties: vec!["additionalTextEdits".to_string()],
272                            }),
273                            ..Default::default()
274                        }),
275                        ..Default::default()
276                    }),
277                    ..Default::default()
278                }),
279                experimental: Some(json!({
280                    "serverStatusNotification": true,
281                })),
282                window: Some(WindowClientCapabilities {
283                    work_done_progress: Some(true),
284                    ..Default::default()
285                }),
286                ..Default::default()
287            },
288            trace: Default::default(),
289            workspace_folders: Default::default(),
290            client_info: Default::default(),
291            locale: Default::default(),
292        };
293
294        let response = this.request::<request::Initialize>(params).await?;
295        Arc::get_mut(&mut this).unwrap().capabilities = response.capabilities;
296        this.notify::<notification::Initialized>(InitializedParams {})?;
297        Ok(this)
298    }
299
300    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
301        if let Some(tasks) = self.io_tasks.lock().take() {
302            let response_handlers = self.response_handlers.clone();
303            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
304            let outbound_tx = self.outbound_tx.clone();
305            let mut output_done = self.output_done_rx.lock().take().unwrap();
306            let shutdown_request = Self::request_internal::<request::Shutdown>(
307                &next_id,
308                &response_handlers,
309                &outbound_tx,
310                (),
311            );
312            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
313            outbound_tx.close();
314            Some(
315                async move {
316                    log::debug!("language server shutdown started");
317                    shutdown_request.await?;
318                    response_handlers.lock().clear();
319                    exit?;
320                    output_done.recv().await;
321                    log::debug!("language server shutdown finished");
322                    drop(tasks);
323                    Ok(())
324                }
325                .log_err(),
326            )
327        } else {
328            None
329        }
330    }
331
332    pub fn on_notification<T, F>(&mut self, mut f: F) -> Subscription
333    where
334        T: notification::Notification,
335        F: 'static + Send + Sync + FnMut(T::Params),
336    {
337        let prev_handler = self.notification_handlers.write().insert(
338            T::METHOD,
339            Box::new(
340                move |notification| match serde_json::from_str(notification) {
341                    Ok(notification) => f(notification),
342                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
343                },
344            ),
345        );
346
347        assert!(
348            prev_handler.is_none(),
349            "registered multiple handlers for the same notification"
350        );
351
352        Subscription {
353            method: T::METHOD,
354            notification_handlers: self.notification_handlers.clone(),
355        }
356    }
357
358    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
359        &self.capabilities
360    }
361
362    pub fn request<T: request::Request>(
363        self: &Arc<Self>,
364        params: T::Params,
365    ) -> impl Future<Output = Result<T::Result>>
366    where
367        T::Result: 'static + Send,
368    {
369        Self::request_internal::<T>(
370            &self.next_id,
371            &self.response_handlers,
372            &self.outbound_tx,
373            params,
374        )
375    }
376
377    fn request_internal<T: request::Request>(
378        next_id: &AtomicUsize,
379        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
380        outbound_tx: &channel::Sender<Vec<u8>>,
381        params: T::Params,
382    ) -> impl 'static + Future<Output = Result<T::Result>>
383    where
384        T::Result: 'static + Send,
385    {
386        let id = next_id.fetch_add(1, SeqCst);
387        let message = serde_json::to_vec(&Request {
388            jsonrpc: JSON_RPC_VERSION,
389            id,
390            method: T::METHOD,
391            params,
392        })
393        .unwrap();
394
395        let send = outbound_tx
396            .try_send(message)
397            .context("failed to write to language server's stdin");
398
399        let (tx, rx) = oneshot::channel();
400        response_handlers.lock().insert(
401            id,
402            Box::new(move |result| {
403                let response = match result {
404                    Ok(response) => {
405                        serde_json::from_str(response).context("failed to deserialize response")
406                    }
407                    Err(error) => Err(anyhow!("{}", error.message)),
408                };
409                let _ = tx.send(response);
410            }),
411        );
412
413        async move {
414            send?;
415            rx.await?
416        }
417    }
418
419    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
420        Self::notify_internal::<T>(&self.outbound_tx, params)
421    }
422
423    fn notify_internal<T: notification::Notification>(
424        outbound_tx: &channel::Sender<Vec<u8>>,
425        params: T::Params,
426    ) -> Result<()> {
427        let message = serde_json::to_vec(&Notification {
428            jsonrpc: JSON_RPC_VERSION,
429            method: T::METHOD,
430            params,
431        })
432        .unwrap();
433        outbound_tx.try_send(message)?;
434        Ok(())
435    }
436}
437
438impl Drop for LanguageServer {
439    fn drop(&mut self) {
440        if let Some(shutdown) = self.shutdown() {
441            self.executor.spawn(shutdown).detach();
442        }
443    }
444}
445
446impl Subscription {
447    pub fn detach(mut self) {
448        self.method = "";
449    }
450}
451
452impl Drop for Subscription {
453    fn drop(&mut self) {
454        self.notification_handlers.write().remove(self.method);
455    }
456}
457
458#[cfg(any(test, feature = "test-support"))]
459pub struct FakeLanguageServer {
460    handlers: FakeLanguageServerHandlers,
461    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
462    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
463    _input_task: Task<Result<()>>,
464    _output_task: Task<Result<()>>,
465}
466
467#[cfg(any(test, feature = "test-support"))]
468type FakeLanguageServerHandlers = Arc<
469    Mutex<
470        HashMap<
471            &'static str,
472            Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
473        >,
474    >,
475>;
476
477#[cfg(any(test, feature = "test-support"))]
478impl LanguageServer {
479    pub fn full_capabilities() -> ServerCapabilities {
480        ServerCapabilities {
481            document_highlight_provider: Some(OneOf::Left(true)),
482            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
483            document_formatting_provider: Some(OneOf::Left(true)),
484            document_range_formatting_provider: Some(OneOf::Left(true)),
485            ..Default::default()
486        }
487    }
488
489    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
490        Self::fake_with_capabilities(Self::full_capabilities(), cx)
491    }
492
493    pub fn fake_with_capabilities(
494        capabilities: ServerCapabilities,
495        cx: &mut gpui::MutableAppContext,
496    ) -> (Self, FakeLanguageServer) {
497        let (stdin_writer, stdin_reader) = async_pipe::pipe();
498        let (stdout_writer, stdout_reader) = async_pipe::pipe();
499
500        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
501        fake.handle_request::<request::Initialize, _>({
502            let capabilities = capabilities.clone();
503            move |_, _| InitializeResult {
504                capabilities: capabilities.clone(),
505                ..Default::default()
506            }
507        });
508
509        let executor = cx.background().clone();
510        let server =
511            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor);
512        (server, fake)
513    }
514}
515
516#[cfg(any(test, feature = "test-support"))]
517impl FakeLanguageServer {
518    fn new(
519        stdin: async_pipe::PipeReader,
520        stdout: async_pipe::PipeWriter,
521        cx: &mut gpui::MutableAppContext,
522    ) -> Self {
523        use futures::StreamExt as _;
524
525        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
526        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
527        let handlers = FakeLanguageServerHandlers::default();
528
529        let input_task = cx.spawn(|cx| {
530            let handlers = handlers.clone();
531            let outgoing_tx = outgoing_tx.clone();
532            async move {
533                let mut buffer = Vec::new();
534                let mut stdin = smol::io::BufReader::new(stdin);
535                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
536                    cx.background().simulate_random_delay().await;
537
538                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
539                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
540
541                        let response;
542                        if let Some(handler) = handlers.lock().get_mut(request.method) {
543                            response =
544                                handler(request.id, request.params.get().as_bytes(), cx.clone());
545                            log::debug!("handled lsp request. method:{}", request.method);
546                        } else {
547                            response = serde_json::to_vec(&AnyResponse {
548                                id: request.id,
549                                error: Some(Error {
550                                    message: "no handler".to_string(),
551                                }),
552                                result: None,
553                            })
554                            .unwrap();
555                            log::debug!("unhandled lsp request. method:{}", request.method);
556                        }
557                        outgoing_tx.unbounded_send(response)?;
558                    } else {
559                        incoming_tx.unbounded_send(buffer.clone())?;
560                    }
561                }
562                Ok::<_, anyhow::Error>(())
563            }
564        });
565
566        let output_task = cx.background().spawn(async move {
567            let mut stdout = smol::io::BufWriter::new(stdout);
568            while let Some(message) = outgoing_rx.next().await {
569                stdout
570                    .write_all(CONTENT_LEN_HEADER.as_bytes())
571                    .await
572                    .unwrap();
573                stdout
574                    .write_all((format!("{}", message.len())).as_bytes())
575                    .await
576                    .unwrap();
577                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
578                stdout.write_all(&message).await.unwrap();
579                stdout.flush().await.unwrap();
580            }
581            Ok(())
582        });
583
584        Self {
585            outgoing_tx,
586            incoming_rx,
587            handlers,
588            _input_task: input_task,
589            _output_task: output_task,
590        }
591    }
592
593    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
594        let message = serde_json::to_vec(&Notification {
595            jsonrpc: JSON_RPC_VERSION,
596            method: T::METHOD,
597            params,
598        })
599        .unwrap();
600        self.outgoing_tx.unbounded_send(message).unwrap();
601    }
602
603    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
604        use futures::StreamExt as _;
605
606        loop {
607            let bytes = self.incoming_rx.next().await.unwrap();
608            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
609                assert_eq!(notification.method, T::METHOD);
610                return notification.params;
611            } else {
612                log::info!(
613                    "skipping message in fake language server {:?}",
614                    std::str::from_utf8(&bytes)
615                );
616            }
617        }
618    }
619
620    pub fn handle_request<T, F>(
621        &mut self,
622        mut handler: F,
623    ) -> futures::channel::mpsc::UnboundedReceiver<()>
624    where
625        T: 'static + request::Request,
626        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
627    {
628        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
629        self.handlers.lock().insert(
630            T::METHOD,
631            Box::new(move |id, params, cx| {
632                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
633                let result = serde_json::to_string(&result).unwrap();
634                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
635                let response = AnyResponse {
636                    id,
637                    error: None,
638                    result: Some(result),
639                };
640                responded_tx.unbounded_send(()).ok();
641                serde_json::to_vec(&response).unwrap()
642            }),
643        );
644        responded_rx
645    }
646
647    pub fn remove_request_handler<T>(&mut self)
648    where
649        T: 'static + request::Request,
650    {
651        self.handlers.lock().remove(T::METHOD);
652    }
653
654    pub async fn start_progress(&mut self, token: impl Into<String>) {
655        self.notify::<notification::Progress>(ProgressParams {
656            token: NumberOrString::String(token.into()),
657            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
658        });
659    }
660
661    pub async fn end_progress(&mut self, token: impl Into<String>) {
662        self.notify::<notification::Progress>(ProgressParams {
663            token: NumberOrString::String(token.into()),
664            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
665        });
666    }
667
668    async fn receive(
669        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
670        buffer: &mut Vec<u8>,
671    ) -> Result<()> {
672        buffer.clear();
673        stdin.read_until(b'\n', buffer).await?;
674        stdin.read_until(b'\n', buffer).await?;
675        let message_len: usize = std::str::from_utf8(buffer)
676            .unwrap()
677            .strip_prefix(CONTENT_LEN_HEADER)
678            .ok_or_else(|| anyhow!("invalid content length header"))?
679            .trim_end()
680            .parse()
681            .unwrap();
682        buffer.resize(message_len, 0);
683        stdin.read_exact(buffer).await?;
684        Ok(())
685    }
686}
687
688struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
689
690impl Drop for ClearResponseHandlers {
691    fn drop(&mut self) {
692        self.0.lock().clear();
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use gpui::TestAppContext;
700
701    #[ctor::ctor]
702    fn init_logger() {
703        if std::env::var("RUST_LOG").is_ok() {
704            env_logger::init();
705        }
706    }
707
708    #[gpui::test]
709    async fn test_fake(cx: &mut TestAppContext) {
710        let (mut server, mut fake) = cx.update(LanguageServer::fake);
711
712        let (message_tx, message_rx) = channel::unbounded();
713        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
714        server
715            .on_notification::<notification::ShowMessage, _>(move |params| {
716                message_tx.try_send(params).unwrap()
717            })
718            .detach();
719        server
720            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
721                diagnostics_tx.try_send(params).unwrap()
722            })
723            .detach();
724
725        let server = server.initialize().await.unwrap();
726        server
727            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
728                text_document: TextDocumentItem::new(
729                    Url::from_str("file://a/b").unwrap(),
730                    "rust".to_string(),
731                    0,
732                    "".to_string(),
733                ),
734            })
735            .unwrap();
736        assert_eq!(
737            fake.receive_notification::<notification::DidOpenTextDocument>()
738                .await
739                .text_document
740                .uri
741                .as_str(),
742            "file://a/b"
743        );
744
745        fake.notify::<notification::ShowMessage>(ShowMessageParams {
746            typ: MessageType::ERROR,
747            message: "ok".to_string(),
748        });
749        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
750            uri: Url::from_str("file://b/c").unwrap(),
751            version: Some(5),
752            diagnostics: vec![],
753        });
754        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
755        assert_eq!(
756            diagnostics_rx.recv().await.unwrap().uri.as_str(),
757            "file://b/c"
758        );
759
760        fake.handle_request::<request::Shutdown, _>(|_, _| ());
761
762        drop(server);
763        fake.receive_notification::<notification::Exit>().await;
764    }
765}