lsp.rs

  1pub use lsp_types::*;
  2
  3use anyhow::{anyhow, Context, Result};
  4use collections::HashMap;
  5use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  6use gpui::{executor, AsyncAppContext, Task};
  7use parking_lot::Mutex;
  8use postage::{barrier, prelude::Stream};
  9use serde::{de::DeserializeOwned, Deserialize, Serialize};
 10use serde_json::{json, value::RawValue, Value};
 11use smol::{
 12    channel,
 13    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 14    process,
 15};
 16use std::{
 17    future::Future,
 18    io::Write,
 19    path::PathBuf,
 20    str::FromStr,
 21    sync::{
 22        atomic::{AtomicUsize, Ordering::SeqCst},
 23        Arc,
 24    },
 25};
 26use std::{path::Path, process::Stdio};
 27use util::{ResultExt, TryFutureExt};
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 33type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 34
 35pub struct LanguageServer {
 36    server_id: usize,
 37    next_id: AtomicUsize,
 38    outbound_tx: channel::Sender<Vec<u8>>,
 39    name: String,
 40    capabilities: ServerCapabilities,
 41    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 42    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 43    executor: Arc<executor::Background>,
 44    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 45    output_done_rx: Mutex<Option<barrier::Receiver>>,
 46    root_path: PathBuf,
 47}
 48
 49pub struct Subscription {
 50    method: &'static str,
 51    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 52}
 53
 54#[derive(Serialize, Deserialize)]
 55struct Request<'a, T> {
 56    jsonrpc: &'a str,
 57    id: usize,
 58    method: &'a str,
 59    params: T,
 60}
 61
 62#[derive(Serialize, Deserialize)]
 63struct AnyResponse<'a> {
 64    id: usize,
 65    #[serde(default)]
 66    error: Option<Error>,
 67    #[serde(borrow)]
 68    result: Option<&'a RawValue>,
 69}
 70
 71#[derive(Serialize)]
 72struct Response<T> {
 73    id: usize,
 74    result: Option<T>,
 75    error: Option<Error>,
 76}
 77
 78#[derive(Serialize, Deserialize)]
 79struct Notification<'a, T> {
 80    #[serde(borrow)]
 81    jsonrpc: &'a str,
 82    #[serde(borrow)]
 83    method: &'a str,
 84    params: T,
 85}
 86
 87#[derive(Deserialize)]
 88struct AnyNotification<'a> {
 89    #[serde(default)]
 90    id: Option<usize>,
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        server_id: usize,
105        binary_path: &Path,
106        args: &[&str],
107        root_path: &Path,
108        cx: AsyncAppContext,
109    ) -> Result<Self> {
110        let working_dir = if root_path.is_dir() {
111            root_path
112        } else {
113            root_path.parent().unwrap_or(Path::new("/"))
114        };
115        let mut server = process::Command::new(binary_path)
116            .current_dir(working_dir)
117            .args(args)
118            .stdin(Stdio::piped())
119            .stdout(Stdio::piped())
120            .stderr(Stdio::inherit())
121            .spawn()?;
122        let stdin = server.stdin.take().unwrap();
123        let stdout = server.stdout.take().unwrap();
124        let mut server =
125            Self::new_internal(server_id, stdin, stdout, root_path, cx, |notification| {
126                log::info!(
127                    "unhandled notification {}:\n{}",
128                    notification.method,
129                    serde_json::to_string_pretty(
130                        &Value::from_str(notification.params.get()).unwrap()
131                    )
132                    .unwrap()
133                );
134            });
135        if let Some(name) = binary_path.file_name() {
136            server.name = name.to_string_lossy().to_string();
137        }
138        Ok(server)
139    }
140
141    fn new_internal<Stdin, Stdout, F>(
142        server_id: usize,
143        stdin: Stdin,
144        stdout: Stdout,
145        root_path: &Path,
146        cx: AsyncAppContext,
147        mut on_unhandled_notification: F,
148    ) -> Self
149    where
150        Stdin: AsyncWrite + Unpin + Send + 'static,
151        Stdout: AsyncRead + Unpin + Send + 'static,
152        F: FnMut(AnyNotification) + 'static + Send,
153    {
154        let mut stdin = BufWriter::new(stdin);
155        let mut stdout = BufReader::new(stdout);
156        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
157        let notification_handlers =
158            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
159        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
160        let input_task = cx.spawn(|cx| {
161            let notification_handlers = notification_handlers.clone();
162            let response_handlers = response_handlers.clone();
163            async move {
164                let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
165                let mut buffer = Vec::new();
166                loop {
167                    buffer.clear();
168                    stdout.read_until(b'\n', &mut buffer).await?;
169                    stdout.read_until(b'\n', &mut buffer).await?;
170                    let message_len: usize = std::str::from_utf8(&buffer)?
171                        .strip_prefix(CONTENT_LEN_HEADER)
172                        .ok_or_else(|| anyhow!("invalid header"))?
173                        .trim_end()
174                        .parse()?;
175
176                    buffer.resize(message_len, 0);
177                    stdout.read_exact(&mut buffer).await?;
178                    log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
179
180                    if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
181                        if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
182                            handler(msg.id, msg.params.get(), cx.clone());
183                        } else {
184                            on_unhandled_notification(msg);
185                        }
186                    } else if let Ok(AnyResponse { id, error, result }) =
187                        serde_json::from_slice(&buffer)
188                    {
189                        if let Some(handler) = response_handlers.lock().remove(&id) {
190                            if let Some(error) = error {
191                                handler(Err(error));
192                            } else if let Some(result) = result {
193                                handler(Ok(result.get()));
194                            } else {
195                                handler(Ok("null"));
196                            }
197                        }
198                    } else {
199                        return Err(anyhow!(
200                            "failed to deserialize message:\n{}",
201                            std::str::from_utf8(&buffer)?
202                        ));
203                    }
204
205                    // Don't starve the main thread when receiving lots of messages at once.
206                    smol::future::yield_now().await;
207                }
208            }
209            .log_err()
210        });
211        let (output_done_tx, output_done_rx) = barrier::channel();
212        let output_task = cx.background().spawn({
213            let response_handlers = response_handlers.clone();
214            async move {
215                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
216                let mut content_len_buffer = Vec::new();
217                while let Ok(message) = outbound_rx.recv().await {
218                    log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
219                    content_len_buffer.clear();
220                    write!(content_len_buffer, "{}", message.len()).unwrap();
221                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
222                    stdin.write_all(&content_len_buffer).await?;
223                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
224                    stdin.write_all(&message).await?;
225                    stdin.flush().await?;
226                }
227                drop(output_done_tx);
228                Ok(())
229            }
230            .log_err()
231        });
232
233        Self {
234            server_id,
235            notification_handlers,
236            response_handlers,
237            name: Default::default(),
238            capabilities: Default::default(),
239            next_id: Default::default(),
240            outbound_tx,
241            executor: cx.background().clone(),
242            io_tasks: Mutex::new(Some((input_task, output_task))),
243            output_done_rx: Mutex::new(Some(output_done_rx)),
244            root_path: root_path.to_path_buf(),
245        }
246    }
247
248    pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
249        let root_uri = Url::from_file_path(&self.root_path).unwrap();
250        #[allow(deprecated)]
251        let params = InitializeParams {
252            process_id: Default::default(),
253            root_path: Default::default(),
254            root_uri: Some(root_uri.clone()),
255            initialization_options: options,
256            capabilities: ClientCapabilities {
257                workspace: Some(WorkspaceClientCapabilities {
258                    configuration: Some(true),
259                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
260                        dynamic_registration: Some(true),
261                    }),
262                    ..Default::default()
263                }),
264                text_document: Some(TextDocumentClientCapabilities {
265                    definition: Some(GotoCapability {
266                        link_support: Some(true),
267                        ..Default::default()
268                    }),
269                    code_action: Some(CodeActionClientCapabilities {
270                        code_action_literal_support: Some(CodeActionLiteralSupport {
271                            code_action_kind: CodeActionKindLiteralSupport {
272                                value_set: vec![
273                                    CodeActionKind::REFACTOR.as_str().into(),
274                                    CodeActionKind::QUICKFIX.as_str().into(),
275                                    CodeActionKind::SOURCE.as_str().into(),
276                                ],
277                            },
278                        }),
279                        data_support: Some(true),
280                        resolve_support: Some(CodeActionCapabilityResolveSupport {
281                            properties: vec!["edit".to_string(), "command".to_string()],
282                        }),
283                        ..Default::default()
284                    }),
285                    completion: Some(CompletionClientCapabilities {
286                        completion_item: Some(CompletionItemCapability {
287                            snippet_support: Some(true),
288                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
289                                properties: vec!["additionalTextEdits".to_string()],
290                            }),
291                            ..Default::default()
292                        }),
293                        ..Default::default()
294                    }),
295                    rename: Some(RenameClientCapabilities {
296                        prepare_support: Some(true),
297                        ..Default::default()
298                    }),
299                    hover: Some(HoverClientCapabilities {
300                        content_format: Some(vec![MarkupKind::Markdown]),
301                        ..Default::default()
302                    }),
303                    ..Default::default()
304                }),
305                experimental: Some(json!({
306                    "serverStatusNotification": true,
307                })),
308                window: Some(WindowClientCapabilities {
309                    work_done_progress: Some(true),
310                    ..Default::default()
311                }),
312                ..Default::default()
313            },
314            trace: Default::default(),
315            workspace_folders: Some(vec![WorkspaceFolder {
316                uri: root_uri,
317                name: Default::default(),
318            }]),
319            client_info: Default::default(),
320            locale: Default::default(),
321        };
322
323        let response = self.request::<request::Initialize>(params).await?;
324        if let Some(info) = response.server_info {
325            self.name = info.name;
326        }
327        self.capabilities = response.capabilities;
328
329        self.notify::<notification::Initialized>(InitializedParams {})?;
330        Ok(Arc::new(self))
331    }
332
333    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
334        if let Some(tasks) = self.io_tasks.lock().take() {
335            let response_handlers = self.response_handlers.clone();
336            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
337            let outbound_tx = self.outbound_tx.clone();
338            let mut output_done = self.output_done_rx.lock().take().unwrap();
339            let shutdown_request = Self::request_internal::<request::Shutdown>(
340                &next_id,
341                &response_handlers,
342                &outbound_tx,
343                (),
344            );
345            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
346            outbound_tx.close();
347            Some(
348                async move {
349                    log::debug!("language server shutdown started");
350                    shutdown_request.await?;
351                    response_handlers.lock().clear();
352                    exit?;
353                    output_done.recv().await;
354                    log::debug!("language server shutdown finished");
355                    drop(tasks);
356                    Ok(())
357                }
358                .log_err(),
359            )
360        } else {
361            None
362        }
363    }
364
365    #[must_use]
366    pub fn on_notification<T, F>(&self, f: F) -> Subscription
367    where
368        T: notification::Notification,
369        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
370    {
371        self.on_custom_notification(T::METHOD, f)
372    }
373
374    #[must_use]
375    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
376    where
377        T: request::Request,
378        T::Params: 'static + Send,
379        F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
380        Fut: 'static + Future<Output = Result<T::Result>>,
381    {
382        self.on_custom_request(T::METHOD, f)
383    }
384
385    pub fn remove_request_handler<T: request::Request>(&self) {
386        self.notification_handlers.lock().remove(T::METHOD);
387    }
388
389    #[must_use]
390    pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
391    where
392        F: 'static + Send + FnMut(Params, AsyncAppContext),
393        Params: DeserializeOwned,
394    {
395        let prev_handler = self.notification_handlers.lock().insert(
396            method,
397            Box::new(move |_, params, cx| {
398                if let Some(params) = serde_json::from_str(params).log_err() {
399                    f(params, cx);
400                }
401            }),
402        );
403        assert!(
404            prev_handler.is_none(),
405            "registered multiple handlers for the same LSP method"
406        );
407        Subscription {
408            method,
409            notification_handlers: self.notification_handlers.clone(),
410        }
411    }
412
413    #[must_use]
414    pub fn on_custom_request<Params, Res, Fut, F>(
415        &self,
416        method: &'static str,
417        mut f: F,
418    ) -> Subscription
419    where
420        F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
421        Fut: 'static + Future<Output = Result<Res>>,
422        Params: DeserializeOwned + Send + 'static,
423        Res: Serialize,
424    {
425        let outbound_tx = self.outbound_tx.clone();
426        let prev_handler = self.notification_handlers.lock().insert(
427            method,
428            Box::new(move |id, params, cx| {
429                if let Some(id) = id {
430                    if let Some(params) = serde_json::from_str(params).log_err() {
431                        let response = f(params, cx.clone());
432                        cx.foreground()
433                            .spawn({
434                                let outbound_tx = outbound_tx.clone();
435                                async move {
436                                    let response = match response.await {
437                                        Ok(result) => Response {
438                                            id,
439                                            result: Some(result),
440                                            error: None,
441                                        },
442                                        Err(error) => Response {
443                                            id,
444                                            result: None,
445                                            error: Some(Error {
446                                                message: error.to_string(),
447                                            }),
448                                        },
449                                    };
450                                    if let Some(response) = serde_json::to_vec(&response).log_err()
451                                    {
452                                        outbound_tx.try_send(response).ok();
453                                    }
454                                }
455                            })
456                            .detach();
457                    }
458                }
459            }),
460        );
461        assert!(
462            prev_handler.is_none(),
463            "registered multiple handlers for the same LSP method"
464        );
465        Subscription {
466            method,
467            notification_handlers: self.notification_handlers.clone(),
468        }
469    }
470
471    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
472        &self.name
473    }
474
475    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
476        &self.capabilities
477    }
478
479    pub fn server_id(&self) -> usize {
480        self.server_id
481    }
482
483    pub fn request<T: request::Request>(
484        &self,
485        params: T::Params,
486    ) -> impl Future<Output = Result<T::Result>>
487    where
488        T::Result: 'static + Send,
489    {
490        Self::request_internal::<T>(
491            &self.next_id,
492            &self.response_handlers,
493            &self.outbound_tx,
494            params,
495        )
496    }
497
498    fn request_internal<T: request::Request>(
499        next_id: &AtomicUsize,
500        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
501        outbound_tx: &channel::Sender<Vec<u8>>,
502        params: T::Params,
503    ) -> impl 'static + Future<Output = Result<T::Result>>
504    where
505        T::Result: 'static + Send,
506    {
507        let id = next_id.fetch_add(1, SeqCst);
508        let message = serde_json::to_vec(&Request {
509            jsonrpc: JSON_RPC_VERSION,
510            id,
511            method: T::METHOD,
512            params,
513        })
514        .unwrap();
515
516        let send = outbound_tx
517            .try_send(message)
518            .context("failed to write to language server's stdin");
519
520        let (tx, rx) = oneshot::channel();
521        response_handlers.lock().insert(
522            id,
523            Box::new(move |result| {
524                let response = match result {
525                    Ok(response) => {
526                        serde_json::from_str(response).context("failed to deserialize response")
527                    }
528                    Err(error) => Err(anyhow!("{}", error.message)),
529                };
530                let _ = tx.send(response);
531            }),
532        );
533
534        async move {
535            send?;
536            rx.await?
537        }
538    }
539
540    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
541        Self::notify_internal::<T>(&self.outbound_tx, params)
542    }
543
544    fn notify_internal<T: notification::Notification>(
545        outbound_tx: &channel::Sender<Vec<u8>>,
546        params: T::Params,
547    ) -> Result<()> {
548        let message = serde_json::to_vec(&Notification {
549            jsonrpc: JSON_RPC_VERSION,
550            method: T::METHOD,
551            params,
552        })
553        .unwrap();
554        outbound_tx.try_send(message)?;
555        Ok(())
556    }
557}
558
559impl Drop for LanguageServer {
560    fn drop(&mut self) {
561        if let Some(shutdown) = self.shutdown() {
562            self.executor.spawn(shutdown).detach();
563        }
564    }
565}
566
567impl Subscription {
568    pub fn detach(mut self) {
569        self.method = "";
570    }
571}
572
573impl Drop for Subscription {
574    fn drop(&mut self) {
575        self.notification_handlers.lock().remove(self.method);
576    }
577}
578
579#[cfg(any(test, feature = "test-support"))]
580#[derive(Clone)]
581pub struct FakeLanguageServer {
582    pub server: Arc<LanguageServer>,
583    notifications_rx: channel::Receiver<(String, String)>,
584}
585
586#[cfg(any(test, feature = "test-support"))]
587impl LanguageServer {
588    pub fn full_capabilities() -> ServerCapabilities {
589        ServerCapabilities {
590            document_highlight_provider: Some(OneOf::Left(true)),
591            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
592            document_formatting_provider: Some(OneOf::Left(true)),
593            document_range_formatting_provider: Some(OneOf::Left(true)),
594            ..Default::default()
595        }
596    }
597
598    pub fn fake(
599        name: String,
600        capabilities: ServerCapabilities,
601        cx: AsyncAppContext,
602    ) -> (Self, FakeLanguageServer) {
603        let (stdin_writer, stdin_reader) = async_pipe::pipe();
604        let (stdout_writer, stdout_reader) = async_pipe::pipe();
605        let (notifications_tx, notifications_rx) = channel::unbounded();
606
607        let server = Self::new_internal(
608            0,
609            stdin_writer,
610            stdout_reader,
611            Path::new("/"),
612            cx.clone(),
613            |_| {},
614        );
615        let fake = FakeLanguageServer {
616            server: Arc::new(Self::new_internal(
617                0,
618                stdout_writer,
619                stdin_reader,
620                Path::new("/"),
621                cx.clone(),
622                move |msg| {
623                    notifications_tx
624                        .try_send((msg.method.to_string(), msg.params.get().to_string()))
625                        .ok();
626                },
627            )),
628            notifications_rx,
629        };
630        fake.handle_request::<request::Initialize, _, _>({
631            let capabilities = capabilities.clone();
632            move |_, _| {
633                let capabilities = capabilities.clone();
634                let name = name.clone();
635                async move {
636                    Ok(InitializeResult {
637                        capabilities,
638                        server_info: Some(ServerInfo {
639                            name,
640                            ..Default::default()
641                        }),
642                        ..Default::default()
643                    })
644                }
645            }
646        });
647
648        (server, fake)
649    }
650}
651
652#[cfg(any(test, feature = "test-support"))]
653impl FakeLanguageServer {
654    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
655        self.server.notify::<T>(params).ok();
656    }
657
658    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
659    where
660        T: request::Request,
661        T::Result: 'static + Send,
662    {
663        self.server.request::<T>(params).await
664    }
665
666    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
667        self.try_receive_notification::<T>().await.unwrap()
668    }
669
670    pub async fn try_receive_notification<T: notification::Notification>(
671        &mut self,
672    ) -> Option<T::Params> {
673        use futures::StreamExt as _;
674
675        loop {
676            let (method, params) = self.notifications_rx.next().await?;
677            if &method == T::METHOD {
678                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
679            } else {
680                log::info!("skipping message in fake language server {:?}", params);
681            }
682        }
683    }
684
685    pub fn handle_request<T, F, Fut>(
686        &self,
687        mut handler: F,
688    ) -> futures::channel::mpsc::UnboundedReceiver<()>
689    where
690        T: 'static + request::Request,
691        T::Params: 'static + Send,
692        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
693        Fut: 'static + Send + Future<Output = Result<T::Result>>,
694    {
695        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
696        self.server.remove_request_handler::<T>();
697        self.server
698            .on_request::<T, _, _>(move |params, cx| {
699                let result = handler(params, cx.clone());
700                let responded_tx = responded_tx.clone();
701                async move {
702                    cx.background().simulate_random_delay().await;
703                    let result = result.await;
704                    responded_tx.unbounded_send(()).ok();
705                    result
706                }
707            })
708            .detach();
709        responded_rx
710    }
711
712    pub fn remove_request_handler<T>(&mut self)
713    where
714        T: 'static + request::Request,
715    {
716        self.server.remove_request_handler::<T>();
717    }
718
719    pub async fn start_progress(&self, token: impl Into<String>) {
720        let token = token.into();
721        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
722            token: NumberOrString::String(token.clone()),
723        })
724        .await
725        .unwrap();
726        self.notify::<notification::Progress>(ProgressParams {
727            token: NumberOrString::String(token),
728            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
729        });
730    }
731
732    pub fn end_progress(&self, token: impl Into<String>) {
733        self.notify::<notification::Progress>(ProgressParams {
734            token: NumberOrString::String(token.into()),
735            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
736        });
737    }
738}
739
740struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
741
742impl Drop for ClearResponseHandlers {
743    fn drop(&mut self) {
744        self.0.lock().clear();
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use super::*;
751    use gpui::TestAppContext;
752
753    #[ctor::ctor]
754    fn init_logger() {
755        if std::env::var("RUST_LOG").is_ok() {
756            env_logger::init();
757        }
758    }
759
760    #[gpui::test]
761    async fn test_fake(cx: &mut TestAppContext) {
762        let (server, mut fake) =
763            LanguageServer::fake("the-lsp".to_string(), Default::default(), cx.to_async());
764
765        let (message_tx, message_rx) = channel::unbounded();
766        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
767        server
768            .on_notification::<notification::ShowMessage, _>(move |params, _| {
769                message_tx.try_send(params).unwrap()
770            })
771            .detach();
772        server
773            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
774                diagnostics_tx.try_send(params).unwrap()
775            })
776            .detach();
777
778        let server = server.initialize(None).await.unwrap();
779        server
780            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
781                text_document: TextDocumentItem::new(
782                    Url::from_str("file://a/b").unwrap(),
783                    "rust".to_string(),
784                    0,
785                    "".to_string(),
786                ),
787            })
788            .unwrap();
789        assert_eq!(
790            fake.receive_notification::<notification::DidOpenTextDocument>()
791                .await
792                .text_document
793                .uri
794                .as_str(),
795            "file://a/b"
796        );
797
798        fake.notify::<notification::ShowMessage>(ShowMessageParams {
799            typ: MessageType::ERROR,
800            message: "ok".to_string(),
801        });
802        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
803            uri: Url::from_str("file://b/c").unwrap(),
804            version: Some(5),
805            diagnostics: vec![],
806        });
807        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
808        assert_eq!(
809            diagnostics_rx.recv().await.unwrap().uri.as_str(),
810            "file://b/c"
811        );
812
813        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
814
815        drop(server);
816        fake.receive_notification::<notification::Exit>().await;
817    }
818}