lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  3use gpui::{executor, Task};
  4use parking_lot::{Mutex, RwLock};
  5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
  6use serde::{Deserialize, Serialize};
  7use serde_json::{json, value::RawValue, Value};
  8use smol::{
  9    channel,
 10    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 11    process::Command,
 12};
 13use std::{
 14    collections::HashMap,
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[cfg(any(test, feature = "test-support"))]
 60#[derive(Deserialize)]
 61struct AnyRequest<'a> {
 62    id: usize,
 63    #[serde(borrow)]
 64    jsonrpc: &'a str,
 65    #[serde(borrow)]
 66    method: &'a str,
 67    #[serde(borrow)]
 68    params: &'a RawValue,
 69}
 70
 71#[derive(Serialize, Deserialize)]
 72struct AnyResponse<'a> {
 73    id: usize,
 74    #[serde(default)]
 75    error: Option<Error>,
 76    #[serde(borrow)]
 77    result: Option<&'a RawValue>,
 78}
 79
 80#[derive(Serialize, Deserialize)]
 81struct Notification<'a, T> {
 82    #[serde(borrow)]
 83    jsonrpc: &'a str,
 84    #[serde(borrow)]
 85    method: &'a str,
 86    params: T,
 87}
 88
 89#[derive(Deserialize)]
 90struct AnyNotification<'a> {
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        binary_path: &Path,
105        root_path: &Path,
106        background: Arc<executor::Background>,
107    ) -> Result<Arc<Self>> {
108        let mut server = Command::new(binary_path)
109            .stdin(Stdio::piped())
110            .stdout(Stdio::piped())
111            .stderr(Stdio::inherit())
112            .spawn()?;
113        let stdin = server.stdin.take().unwrap();
114        let stdout = server.stdout.take().unwrap();
115        Self::new_internal(stdin, stdout, root_path, background)
116    }
117
118    fn new_internal<Stdin, Stdout>(
119        stdin: Stdin,
120        stdout: Stdout,
121        root_path: &Path,
122        executor: Arc<executor::Background>,
123    ) -> Result<Arc<Self>>
124    where
125        Stdin: AsyncWrite + Unpin + Send + 'static,
126        Stdout: AsyncRead + Unpin + Send + 'static,
127    {
128        let mut stdin = BufWriter::new(stdin);
129        let mut stdout = BufReader::new(stdout);
130        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
132        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
133        let input_task = executor.spawn(
134            {
135                let notification_handlers = notification_handlers.clone();
136                let response_handlers = response_handlers.clone();
137                async move {
138                    let mut buffer = Vec::new();
139                    loop {
140                        buffer.clear();
141                        stdout.read_until(b'\n', &mut buffer).await?;
142                        stdout.read_until(b'\n', &mut buffer).await?;
143                        let message_len: usize = std::str::from_utf8(&buffer)?
144                            .strip_prefix(CONTENT_LEN_HEADER)
145                            .ok_or_else(|| anyhow!("invalid header"))?
146                            .trim_end()
147                            .parse()?;
148
149                        buffer.resize(message_len, 0);
150                        stdout.read_exact(&mut buffer).await?;
151
152                        if let Ok(AnyNotification { method, params }) =
153                            serde_json::from_slice(&buffer)
154                        {
155                            if let Some(handler) = notification_handlers.write().get_mut(method) {
156                                handler(params.get());
157                            } else {
158                                log::info!(
159                                    "unhandled notification {}:\n{}",
160                                    method,
161                                    serde_json::to_string_pretty(
162                                        &Value::from_str(params.get()).unwrap()
163                                    )
164                                    .unwrap()
165                                );
166                            }
167                        } else if let Ok(AnyResponse { id, error, result }) =
168                            serde_json::from_slice(&buffer)
169                        {
170                            if let Some(handler) = response_handlers.lock().remove(&id) {
171                                if let Some(error) = error {
172                                    handler(Err(error));
173                                } else if let Some(result) = result {
174                                    handler(Ok(result.get()));
175                                } else {
176                                    handler(Ok("null"));
177                                }
178                            }
179                        } else {
180                            return Err(anyhow!(
181                                "failed to deserialize message:\n{}",
182                                std::str::from_utf8(&buffer)?
183                            ));
184                        }
185                    }
186                }
187            }
188            .log_err(),
189        );
190        let (output_done_tx, output_done_rx) = barrier::channel();
191        let output_task = executor.spawn(
192            async move {
193                let mut content_len_buffer = Vec::new();
194                while let Ok(message) = outbound_rx.recv().await {
195                    content_len_buffer.clear();
196                    write!(content_len_buffer, "{}", message.len()).unwrap();
197                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
198                    stdin.write_all(&content_len_buffer).await?;
199                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
200                    stdin.write_all(&message).await?;
201                    stdin.flush().await?;
202                }
203                drop(output_done_tx);
204                Ok(())
205            }
206            .log_err(),
207        );
208
209        let (initialized_tx, initialized_rx) = barrier::channel();
210        let (mut capabilities_tx, capabilities_rx) = watch::channel();
211        let this = Arc::new(Self {
212            notification_handlers,
213            response_handlers,
214            capabilities: capabilities_rx,
215            next_id: Default::default(),
216            outbound_tx: RwLock::new(Some(outbound_tx)),
217            executor: executor.clone(),
218            io_tasks: Mutex::new(Some((input_task, output_task))),
219            initialized: initialized_rx,
220            output_done_rx: Mutex::new(Some(output_done_rx)),
221        });
222
223        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
224        executor
225            .spawn({
226                let this = this.clone();
227                async move {
228                    if let Some(capabilities) = this.init(root_uri).log_err().await {
229                        *capabilities_tx.borrow_mut() = Some(capabilities);
230                    }
231
232                    drop(initialized_tx);
233                }
234            })
235            .detach();
236
237        Ok(this)
238    }
239
240    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
241        #[allow(deprecated)]
242        let params = InitializeParams {
243            process_id: Default::default(),
244            root_path: Default::default(),
245            root_uri: Some(root_uri),
246            initialization_options: Default::default(),
247            capabilities: ClientCapabilities {
248                text_document: Some(TextDocumentClientCapabilities {
249                    definition: Some(GotoCapability {
250                        link_support: Some(true),
251                        ..Default::default()
252                    }),
253                    code_action: Some(CodeActionClientCapabilities {
254                        code_action_literal_support: Some(CodeActionLiteralSupport {
255                            code_action_kind: CodeActionKindLiteralSupport {
256                                value_set: vec![
257                                    CodeActionKind::REFACTOR.as_str().into(),
258                                    CodeActionKind::QUICKFIX.as_str().into(),
259                                ],
260                            },
261                        }),
262                        data_support: Some(true),
263                        resolve_support: Some(CodeActionCapabilityResolveSupport {
264                            properties: vec!["edit".to_string()],
265                        }),
266                        ..Default::default()
267                    }),
268                    completion: Some(CompletionClientCapabilities {
269                        completion_item: Some(CompletionItemCapability {
270                            snippet_support: Some(true),
271                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
272                                properties: vec!["additionalTextEdits".to_string()],
273                            }),
274                            ..Default::default()
275                        }),
276                        ..Default::default()
277                    }),
278                    ..Default::default()
279                }),
280                experimental: Some(json!({
281                    "serverStatusNotification": true,
282                })),
283                window: Some(WindowClientCapabilities {
284                    work_done_progress: Some(true),
285                    ..Default::default()
286                }),
287                ..Default::default()
288            },
289            trace: Default::default(),
290            workspace_folders: Default::default(),
291            client_info: Default::default(),
292            locale: Default::default(),
293        };
294
295        let this = self.clone();
296        let request = Self::request_internal::<request::Initialize>(
297            &this.next_id,
298            &this.response_handlers,
299            this.outbound_tx.read().as_ref(),
300            params,
301        );
302        let response = request.await?;
303        Self::notify_internal::<notification::Initialized>(
304            this.outbound_tx.read().as_ref(),
305            InitializedParams {},
306        )?;
307        Ok(response.capabilities)
308    }
309
310    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
311        if let Some(tasks) = self.io_tasks.lock().take() {
312            let response_handlers = self.response_handlers.clone();
313            let outbound_tx = self.outbound_tx.write().take();
314            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
315            let mut output_done = self.output_done_rx.lock().take().unwrap();
316            Some(async move {
317                Self::request_internal::<request::Shutdown>(
318                    &next_id,
319                    &response_handlers,
320                    outbound_tx.as_ref(),
321                    (),
322                )
323                .await?;
324                Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
325                drop(outbound_tx);
326                output_done.recv().await;
327                drop(tasks);
328                Ok(())
329            })
330        } else {
331            None
332        }
333    }
334
335    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
336    where
337        T: notification::Notification,
338        F: 'static + Send + Sync + FnMut(T::Params),
339    {
340        let prev_handler = self.notification_handlers.write().insert(
341            T::METHOD,
342            Box::new(
343                move |notification| match serde_json::from_str(notification) {
344                    Ok(notification) => f(notification),
345                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
346                },
347            ),
348        );
349
350        assert!(
351            prev_handler.is_none(),
352            "registered multiple handlers for the same notification"
353        );
354
355        Subscription {
356            method: T::METHOD,
357            notification_handlers: self.notification_handlers.clone(),
358        }
359    }
360
361    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
362        self.capabilities.clone()
363    }
364
365    pub fn request<T: request::Request>(
366        self: &Arc<Self>,
367        params: T::Params,
368    ) -> impl Future<Output = Result<T::Result>>
369    where
370        T::Result: 'static + Send,
371    {
372        let this = self.clone();
373        async move {
374            this.initialized.clone().recv().await;
375            Self::request_internal::<T>(
376                &this.next_id,
377                &this.response_handlers,
378                this.outbound_tx.read().as_ref(),
379                params,
380            )
381            .await
382        }
383    }
384
385    fn request_internal<T: request::Request>(
386        next_id: &AtomicUsize,
387        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
388        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
389        params: T::Params,
390    ) -> impl 'static + Future<Output = Result<T::Result>>
391    where
392        T::Result: 'static + Send,
393    {
394        let id = next_id.fetch_add(1, SeqCst);
395        let message = serde_json::to_vec(&Request {
396            jsonrpc: JSON_RPC_VERSION,
397            id,
398            method: T::METHOD,
399            params,
400        })
401        .unwrap();
402        let mut response_handlers = response_handlers.lock();
403        let (mut tx, mut rx) = oneshot::channel();
404        response_handlers.insert(
405            id,
406            Box::new(move |result| {
407                let response = match result {
408                    Ok(response) => {
409                        serde_json::from_str(response).context("failed to deserialize response")
410                    }
411                    Err(error) => Err(anyhow!("{}", error.message)),
412                };
413                let _ = tx.try_send(response);
414            }),
415        );
416
417        let send = outbound_tx
418            .as_ref()
419            .ok_or_else(|| {
420                anyhow!("tried to send a request to a language server that has been shut down")
421            })
422            .and_then(|outbound_tx| {
423                outbound_tx
424                    .try_send(message)
425                    .context("failed to write to language server's stdin")?;
426                Ok(())
427            });
428        async move {
429            send?;
430            rx.recv().await.unwrap()
431        }
432    }
433
434    pub fn notify<T: notification::Notification>(
435        self: &Arc<Self>,
436        params: T::Params,
437    ) -> impl Future<Output = Result<()>> {
438        let this = self.clone();
439        async move {
440            this.initialized.clone().recv().await;
441            Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
442            Ok(())
443        }
444    }
445
446    fn notify_internal<T: notification::Notification>(
447        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
448        params: T::Params,
449    ) -> Result<()> {
450        let message = serde_json::to_vec(&Notification {
451            jsonrpc: JSON_RPC_VERSION,
452            method: T::METHOD,
453            params,
454        })
455        .unwrap();
456        let outbound_tx = outbound_tx
457            .as_ref()
458            .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
459        outbound_tx.try_send(message)?;
460        Ok(())
461    }
462}
463
464impl Drop for LanguageServer {
465    fn drop(&mut self) {
466        if let Some(shutdown) = self.shutdown() {
467            self.executor.spawn(shutdown).detach();
468        }
469    }
470}
471
472impl Subscription {
473    pub fn detach(mut self) {
474        self.method = "";
475    }
476}
477
478impl Drop for Subscription {
479    fn drop(&mut self) {
480        self.notification_handlers.write().remove(self.method);
481    }
482}
483
484#[cfg(any(test, feature = "test-support"))]
485pub struct FakeLanguageServer {
486    handlers:
487        Arc<Mutex<HashMap<&'static str, Box<dyn Send + Sync + FnMut(usize, &[u8]) -> Vec<u8>>>>>,
488    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
489    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
490}
491
492#[cfg(any(test, feature = "test-support"))]
493impl LanguageServer {
494    pub fn fake(executor: Arc<gpui::executor::Background>) -> (Arc<Self>, FakeLanguageServer) {
495        Self::fake_with_capabilities(Default::default(), executor)
496    }
497
498    pub fn fake_with_capabilities(
499        capabilities: ServerCapabilities,
500        executor: Arc<gpui::executor::Background>,
501    ) -> (Arc<Self>, FakeLanguageServer) {
502        let (stdin_writer, stdin_reader) = async_pipe::pipe();
503        let (stdout_writer, stdout_reader) = async_pipe::pipe();
504
505        let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer);
506        fake.handle_request::<request::Initialize, _>({
507            let capabilities = capabilities.clone();
508            move |_| InitializeResult {
509                capabilities: capabilities.clone(),
510                ..Default::default()
511            }
512        });
513
514        let server =
515            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap();
516
517        (server, fake)
518    }
519}
520
521#[cfg(any(test, feature = "test-support"))]
522impl FakeLanguageServer {
523    fn new(
524        background: Arc<gpui::executor::Background>,
525        stdin: async_pipe::PipeReader,
526        stdout: async_pipe::PipeWriter,
527    ) -> Self {
528        use futures::StreamExt as _;
529
530        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
531        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
532        let this = Self {
533            outgoing_tx: outgoing_tx.clone(),
534            incoming_rx,
535            handlers: Default::default(),
536        };
537
538        // Receive incoming messages
539        let handlers = this.handlers.clone();
540        let executor = background.clone();
541        background
542            .spawn(async move {
543                let mut buffer = Vec::new();
544                let mut stdin = smol::io::BufReader::new(stdin);
545                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
546                    executor.simulate_random_delay().await;
547                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
548                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
549
550                        if let Some(handler) = handlers.lock().get_mut(request.method) {
551                            let response = handler(request.id, request.params.get().as_bytes());
552                            log::debug!("handled lsp request. method:{}", request.method);
553                            outgoing_tx.unbounded_send(response)?;
554                        } else {
555                            log::debug!("unhandled lsp request. method:{}", request.method);
556                            outgoing_tx.unbounded_send(
557                                serde_json::to_vec(&AnyResponse {
558                                    id: request.id,
559                                    error: Some(Error {
560                                        message: "no handler".to_string(),
561                                    }),
562                                    result: None,
563                                })
564                                .unwrap(),
565                            )?;
566                        }
567                    } else {
568                        incoming_tx.unbounded_send(buffer.clone())?;
569                    }
570                }
571                Ok::<_, anyhow::Error>(())
572            })
573            .detach();
574
575        // Send outgoing messages
576        background
577            .spawn(async move {
578                let mut stdout = smol::io::BufWriter::new(stdout);
579                while let Some(notification) = outgoing_rx.next().await {
580                    Self::send(&mut stdout, &notification).await;
581                }
582            })
583            .detach();
584
585        this
586    }
587
588    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
589        let message = serde_json::to_vec(&Notification {
590            jsonrpc: JSON_RPC_VERSION,
591            method: T::METHOD,
592            params,
593        })
594        .unwrap();
595        self.outgoing_tx.unbounded_send(message).unwrap();
596    }
597
598    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
599        use futures::StreamExt as _;
600
601        loop {
602            let bytes = self.incoming_rx.next().await.unwrap();
603            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
604                assert_eq!(notification.method, T::METHOD);
605                return notification.params;
606            } else {
607                log::info!(
608                    "skipping message in fake language server {:?}",
609                    std::str::from_utf8(&bytes)
610                );
611            }
612        }
613    }
614
615    pub fn handle_request<T, F>(
616        &mut self,
617        mut handler: F,
618    ) -> futures::channel::mpsc::UnboundedReceiver<()>
619    where
620        T: 'static + request::Request,
621        F: 'static + Send + Sync + FnMut(T::Params) -> T::Result,
622    {
623        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
624        self.handlers.lock().insert(
625            T::METHOD,
626            Box::new(move |id, params| {
627                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
628                let result = serde_json::to_string(&result).unwrap();
629                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
630                let response = AnyResponse {
631                    id,
632                    error: None,
633                    result: Some(result),
634                };
635                responded_tx.unbounded_send(()).ok();
636                serde_json::to_vec(&response).unwrap()
637            }),
638        );
639        responded_rx
640    }
641
642    pub fn remove_request_handler<T>(&mut self)
643    where
644        T: 'static + request::Request,
645    {
646        self.handlers.lock().remove(T::METHOD);
647    }
648
649    pub async fn start_progress(&mut self, token: impl Into<String>) {
650        self.notify::<notification::Progress>(ProgressParams {
651            token: NumberOrString::String(token.into()),
652            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
653        })
654        .await;
655    }
656
657    pub async fn end_progress(&mut self, token: impl Into<String>) {
658        self.notify::<notification::Progress>(ProgressParams {
659            token: NumberOrString::String(token.into()),
660            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
661        })
662        .await;
663    }
664
665    async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
666        stdout
667            .write_all(CONTENT_LEN_HEADER.as_bytes())
668            .await
669            .unwrap();
670        stdout
671            .write_all((format!("{}", message.len())).as_bytes())
672            .await
673            .unwrap();
674        stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
675        stdout.write_all(&message).await.unwrap();
676        stdout.flush().await.unwrap();
677    }
678
679    async fn receive(
680        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
681        buffer: &mut Vec<u8>,
682    ) -> Result<()> {
683        buffer.clear();
684        stdin.read_until(b'\n', buffer).await?;
685        stdin.read_until(b'\n', buffer).await?;
686        let message_len: usize = std::str::from_utf8(buffer)
687            .unwrap()
688            .strip_prefix(CONTENT_LEN_HEADER)
689            .unwrap()
690            .trim_end()
691            .parse()
692            .unwrap();
693        buffer.resize(message_len, 0);
694        stdin.read_exact(buffer).await?;
695        Ok(())
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use gpui::TestAppContext;
703    use unindent::Unindent;
704    use util::test::temp_tree;
705
706    #[ctor::ctor]
707    fn init_logger() {
708        if std::env::var("RUST_LOG").is_ok() {
709            env_logger::init();
710        }
711    }
712
713    #[gpui::test]
714    async fn test_rust_analyzer(cx: TestAppContext) {
715        let lib_source = r#"
716            fn fun() {
717                let hello = "world";
718            }
719        "#
720        .unindent();
721        let root_dir = temp_tree(json!({
722            "Cargo.toml": r#"
723                [package]
724                name = "temp"
725                version = "0.1.0"
726                edition = "2018"
727            "#.unindent(),
728            "src": {
729                "lib.rs": &lib_source
730            }
731        }));
732        let lib_file_uri = Url::from_file_path(root_dir.path().join("src/lib.rs")).unwrap();
733
734        let server =
735            LanguageServer::new(Path::new("rust-analyzer"), root_dir.path(), cx.background())
736                .unwrap();
737        server.next_idle_notification().await;
738
739        server
740            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
741                text_document: TextDocumentItem::new(
742                    lib_file_uri.clone(),
743                    "rust".to_string(),
744                    0,
745                    lib_source,
746                ),
747            })
748            .await
749            .unwrap();
750
751        let hover = server
752            .request::<request::HoverRequest>(HoverParams {
753                text_document_position_params: TextDocumentPositionParams {
754                    text_document: TextDocumentIdentifier::new(lib_file_uri),
755                    position: Position::new(1, 21),
756                },
757                work_done_progress_params: Default::default(),
758            })
759            .await
760            .unwrap()
761            .unwrap();
762        assert_eq!(
763            hover.contents,
764            HoverContents::Markup(MarkupContent {
765                kind: MarkupKind::PlainText,
766                value: "&str".to_string()
767            })
768        );
769    }
770
771    #[gpui::test]
772    async fn test_fake(cx: TestAppContext) {
773        let (server, mut fake) = LanguageServer::fake(cx.background());
774
775        let (message_tx, message_rx) = channel::unbounded();
776        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
777        server
778            .on_notification::<notification::ShowMessage, _>(move |params| {
779                message_tx.try_send(params).unwrap()
780            })
781            .detach();
782        server
783            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
784                diagnostics_tx.try_send(params).unwrap()
785            })
786            .detach();
787
788        server
789            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
790                text_document: TextDocumentItem::new(
791                    Url::from_str("file://a/b").unwrap(),
792                    "rust".to_string(),
793                    0,
794                    "".to_string(),
795                ),
796            })
797            .await
798            .unwrap();
799        assert_eq!(
800            fake.receive_notification::<notification::DidOpenTextDocument>()
801                .await
802                .text_document
803                .uri
804                .as_str(),
805            "file://a/b"
806        );
807
808        fake.notify::<notification::ShowMessage>(ShowMessageParams {
809            typ: MessageType::ERROR,
810            message: "ok".to_string(),
811        })
812        .await;
813        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
814            uri: Url::from_str("file://b/c").unwrap(),
815            version: Some(5),
816            diagnostics: vec![],
817        })
818        .await;
819        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
820        assert_eq!(
821            diagnostics_rx.recv().await.unwrap().uri.as_str(),
822            "file://b/c"
823        );
824
825        fake.handle_request::<request::Shutdown, _>(|_| ());
826
827        drop(server);
828        fake.receive_notification::<notification::Exit>().await;
829    }
830
831    impl LanguageServer {
832        async fn next_idle_notification(self: &Arc<Self>) {
833            let (tx, rx) = channel::unbounded();
834            let _subscription =
835                self.on_notification::<ServerStatusNotification, _>(move |params| {
836                    if params.quiescent {
837                        tx.try_send(()).unwrap();
838                    }
839                });
840            let _ = rx.recv().await;
841        }
842    }
843
844    pub enum ServerStatusNotification {}
845
846    impl notification::Notification for ServerStatusNotification {
847        type Params = ServerStatusParams;
848        const METHOD: &'static str = "experimental/serverStatus";
849    }
850
851    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
852    pub struct ServerStatusParams {
853        pub quiescent: bool,
854    }
855}