lsp.rs

  1pub use lsp_types::request::*;
  2pub use lsp_types::*;
  3
  4use anyhow::{anyhow, Context, Result};
  5use collections::HashMap;
  6use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  7use gpui::{executor, AsyncAppContext, Task};
  8use parking_lot::Mutex;
  9use postage::{barrier, prelude::Stream};
 10use serde::{de::DeserializeOwned, Deserialize, Serialize};
 11use serde_json::{json, value::RawValue, Value};
 12use smol::{
 13    channel,
 14    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 15    process::{self, Child},
 16};
 17use std::{
 18    future::Future,
 19    io::Write,
 20    path::PathBuf,
 21    str::FromStr,
 22    sync::{
 23        atomic::{AtomicUsize, Ordering::SeqCst},
 24        Arc,
 25    },
 26};
 27use std::{path::Path, process::Stdio};
 28use util::{ResultExt, TryFutureExt};
 29
 30const JSON_RPC_VERSION: &str = "2.0";
 31const CONTENT_LEN_HEADER: &str = "Content-Length: ";
 32
 33type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 35
 36pub struct LanguageServer {
 37    server_id: usize,
 38    next_id: AtomicUsize,
 39    outbound_tx: channel::Sender<Vec<u8>>,
 40    name: String,
 41    capabilities: ServerCapabilities,
 42    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 43    response_handlers: Arc<Mutex<Option<HashMap<usize, ResponseHandler>>>>,
 44    executor: Arc<executor::Background>,
 45    #[allow(clippy::type_complexity)]
 46    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 47    output_done_rx: Mutex<Option<barrier::Receiver>>,
 48    root_path: PathBuf,
 49    _server: Option<Child>,
 50}
 51
 52pub struct Subscription {
 53    method: &'static str,
 54    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 55}
 56
 57#[derive(Serialize, Deserialize)]
 58struct Request<'a, T> {
 59    jsonrpc: &'static str,
 60    id: usize,
 61    method: &'a str,
 62    params: T,
 63}
 64
 65#[derive(Serialize, Deserialize)]
 66struct AnyResponse<'a> {
 67    jsonrpc: &'a str,
 68    id: usize,
 69    #[serde(default)]
 70    error: Option<Error>,
 71    #[serde(borrow)]
 72    result: Option<&'a RawValue>,
 73}
 74
 75#[derive(Serialize)]
 76struct Response<T> {
 77    jsonrpc: &'static str,
 78    id: usize,
 79    result: Option<T>,
 80    error: Option<Error>,
 81}
 82
 83#[derive(Serialize, Deserialize)]
 84struct Notification<'a, T> {
 85    jsonrpc: &'static str,
 86    #[serde(borrow)]
 87    method: &'a str,
 88    params: T,
 89}
 90
 91#[derive(Deserialize)]
 92struct AnyNotification<'a> {
 93    #[serde(default)]
 94    id: Option<usize>,
 95    #[serde(borrow)]
 96    method: &'a str,
 97    #[serde(borrow)]
 98    params: &'a RawValue,
 99}
100
101#[derive(Debug, Serialize, Deserialize)]
102struct Error {
103    message: String,
104}
105
106impl LanguageServer {
107    pub fn new<T: AsRef<std::ffi::OsStr>>(
108        server_id: usize,
109        binary_path: &Path,
110        args: &[T],
111        root_path: &Path,
112        cx: AsyncAppContext,
113    ) -> Result<Self> {
114        let working_dir = if root_path.is_dir() {
115            root_path
116        } else {
117            root_path.parent().unwrap_or_else(|| Path::new("/"))
118        };
119        let mut server = process::Command::new(binary_path)
120            .current_dir(working_dir)
121            .args(args)
122            .stdin(Stdio::piped())
123            .stdout(Stdio::piped())
124            .stderr(Stdio::inherit())
125            .kill_on_drop(true)
126            .spawn()?;
127
128        let stdin = server.stdin.take().unwrap();
129        let stout = server.stdout.take().unwrap();
130
131        let mut server = Self::new_internal(
132            server_id,
133            stdin,
134            stout,
135            Some(server),
136            root_path,
137            cx,
138            |notification| {
139                log::info!(
140                    "unhandled notification {}:\n{}",
141                    notification.method,
142                    serde_json::to_string_pretty(
143                        &Value::from_str(notification.params.get()).unwrap()
144                    )
145                    .unwrap()
146                );
147            },
148        );
149        if let Some(name) = binary_path.file_name() {
150            server.name = name.to_string_lossy().to_string();
151        }
152        Ok(server)
153    }
154
155    fn new_internal<Stdin, Stdout, F>(
156        server_id: usize,
157        stdin: Stdin,
158        stdout: Stdout,
159        server: Option<Child>,
160        root_path: &Path,
161        cx: AsyncAppContext,
162        mut on_unhandled_notification: F,
163    ) -> Self
164    where
165        Stdin: AsyncWrite + Unpin + Send + 'static,
166        Stdout: AsyncRead + Unpin + Send + 'static,
167        F: FnMut(AnyNotification) + 'static + Send,
168    {
169        let mut stdin = BufWriter::new(stdin);
170        let mut stdout = BufReader::new(stdout);
171        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
172        let notification_handlers =
173            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
174        let response_handlers =
175            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
176        let input_task = cx.spawn(|cx| {
177            let notification_handlers = notification_handlers.clone();
178            let response_handlers = response_handlers.clone();
179            async move {
180                let _clear_response_handlers = util::defer({
181                    let response_handlers = response_handlers.clone();
182                    move || {
183                        response_handlers.lock().take();
184                    }
185                });
186                let mut buffer = Vec::new();
187                loop {
188                    buffer.clear();
189                    stdout.read_until(b'\n', &mut buffer).await?;
190                    stdout.read_until(b'\n', &mut buffer).await?;
191                    let message_len: usize = std::str::from_utf8(&buffer)?
192                        .strip_prefix(CONTENT_LEN_HEADER)
193                        .ok_or_else(|| anyhow!("invalid header"))?
194                        .trim_end()
195                        .parse()?;
196
197                    buffer.resize(message_len, 0);
198                    stdout.read_exact(&mut buffer).await?;
199                    log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
200
201                    if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
202                        if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
203                            handler(msg.id, msg.params.get(), cx.clone());
204                        } else {
205                            on_unhandled_notification(msg);
206                        }
207                    } else if let Ok(AnyResponse {
208                        id, error, result, ..
209                    }) = serde_json::from_slice(&buffer)
210                    {
211                        if let Some(handler) = response_handlers
212                            .lock()
213                            .as_mut()
214                            .and_then(|handlers| handlers.remove(&id))
215                        {
216                            if let Some(error) = error {
217                                handler(Err(error));
218                            } else if let Some(result) = result {
219                                handler(Ok(result.get()));
220                            } else {
221                                handler(Ok("null"));
222                            }
223                        }
224                    } else {
225                        return Err(anyhow!(
226                            "failed to deserialize message:\n{}",
227                            std::str::from_utf8(&buffer)?
228                        ));
229                    }
230
231                    // Don't starve the main thread when receiving lots of messages at once.
232                    smol::future::yield_now().await;
233                }
234            }
235            .log_err()
236        });
237        let (output_done_tx, output_done_rx) = barrier::channel();
238        let output_task = cx.background().spawn({
239            let response_handlers = response_handlers.clone();
240            async move {
241                let _clear_response_handlers = util::defer({
242                    let response_handlers = response_handlers.clone();
243                    move || {
244                        response_handlers.lock().take();
245                    }
246                });
247                let mut content_len_buffer = Vec::new();
248                while let Ok(message) = outbound_rx.recv().await {
249                    log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
250                    content_len_buffer.clear();
251                    write!(content_len_buffer, "{}", message.len()).unwrap();
252                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
253                    stdin.write_all(&content_len_buffer).await?;
254                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
255                    stdin.write_all(&message).await?;
256                    stdin.flush().await?;
257                }
258                drop(output_done_tx);
259                Ok(())
260            }
261            .log_err()
262        });
263
264        Self {
265            server_id,
266            notification_handlers,
267            response_handlers,
268            name: Default::default(),
269            capabilities: Default::default(),
270            next_id: Default::default(),
271            outbound_tx,
272            executor: cx.background(),
273            io_tasks: Mutex::new(Some((input_task, output_task))),
274            output_done_rx: Mutex::new(Some(output_done_rx)),
275            root_path: root_path.to_path_buf(),
276            _server: server,
277        }
278    }
279
280    /// Initializes a language server.
281    /// Note that `options` is used directly to construct [`InitializeParams`],
282    /// which is why it is owned.
283    pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
284        let root_uri = Url::from_file_path(&self.root_path).unwrap();
285        #[allow(deprecated)]
286        let params = InitializeParams {
287            process_id: Default::default(),
288            root_path: Default::default(),
289            root_uri: Some(root_uri.clone()),
290            initialization_options: options,
291            capabilities: ClientCapabilities {
292                workspace: Some(WorkspaceClientCapabilities {
293                    configuration: Some(true),
294                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
295                        dynamic_registration: Some(true),
296                    }),
297                    ..Default::default()
298                }),
299                text_document: Some(TextDocumentClientCapabilities {
300                    definition: Some(GotoCapability {
301                        link_support: Some(true),
302                        ..Default::default()
303                    }),
304                    code_action: Some(CodeActionClientCapabilities {
305                        code_action_literal_support: Some(CodeActionLiteralSupport {
306                            code_action_kind: CodeActionKindLiteralSupport {
307                                value_set: vec![
308                                    CodeActionKind::REFACTOR.as_str().into(),
309                                    CodeActionKind::QUICKFIX.as_str().into(),
310                                    CodeActionKind::SOURCE.as_str().into(),
311                                ],
312                            },
313                        }),
314                        data_support: Some(true),
315                        resolve_support: Some(CodeActionCapabilityResolveSupport {
316                            properties: vec!["edit".to_string(), "command".to_string()],
317                        }),
318                        ..Default::default()
319                    }),
320                    completion: Some(CompletionClientCapabilities {
321                        completion_item: Some(CompletionItemCapability {
322                            snippet_support: Some(true),
323                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
324                                properties: vec!["additionalTextEdits".to_string()],
325                            }),
326                            ..Default::default()
327                        }),
328                        ..Default::default()
329                    }),
330                    rename: Some(RenameClientCapabilities {
331                        prepare_support: Some(true),
332                        ..Default::default()
333                    }),
334                    hover: Some(HoverClientCapabilities {
335                        content_format: Some(vec![MarkupKind::Markdown]),
336                        ..Default::default()
337                    }),
338                    ..Default::default()
339                }),
340                experimental: Some(json!({
341                    "serverStatusNotification": true,
342                })),
343                window: Some(WindowClientCapabilities {
344                    work_done_progress: Some(true),
345                    ..Default::default()
346                }),
347                ..Default::default()
348            },
349            trace: Default::default(),
350            workspace_folders: Some(vec![WorkspaceFolder {
351                uri: root_uri,
352                name: Default::default(),
353            }]),
354            client_info: Default::default(),
355            locale: Default::default(),
356        };
357
358        let response = self.request::<request::Initialize>(params).await?;
359        if let Some(info) = response.server_info {
360            self.name = info.name;
361        }
362        self.capabilities = response.capabilities;
363
364        self.notify::<notification::Initialized>(InitializedParams {})?;
365        Ok(Arc::new(self))
366    }
367
368    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
369        if let Some(tasks) = self.io_tasks.lock().take() {
370            let response_handlers = self.response_handlers.clone();
371            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
372            let outbound_tx = self.outbound_tx.clone();
373            let mut output_done = self.output_done_rx.lock().take().unwrap();
374            let shutdown_request = Self::request_internal::<request::Shutdown>(
375                &next_id,
376                &response_handlers,
377                &outbound_tx,
378                (),
379            );
380            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
381            outbound_tx.close();
382            Some(
383                async move {
384                    log::debug!("language server shutdown started");
385                    shutdown_request.await?;
386                    response_handlers.lock().take();
387                    exit?;
388                    output_done.recv().await;
389                    log::debug!("language server shutdown finished");
390                    drop(tasks);
391                    Ok(())
392                }
393                .log_err(),
394            )
395        } else {
396            None
397        }
398    }
399
400    #[must_use]
401    pub fn on_notification<T, F>(&self, f: F) -> Subscription
402    where
403        T: notification::Notification,
404        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
405    {
406        self.on_custom_notification(T::METHOD, f)
407    }
408
409    #[must_use]
410    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
411    where
412        T: request::Request,
413        T::Params: 'static + Send,
414        F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
415        Fut: 'static + Future<Output = Result<T::Result>>,
416    {
417        self.on_custom_request(T::METHOD, f)
418    }
419
420    pub fn remove_request_handler<T: request::Request>(&self) {
421        self.notification_handlers.lock().remove(T::METHOD);
422    }
423
424    #[must_use]
425    pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
426    where
427        F: 'static + Send + FnMut(Params, AsyncAppContext),
428        Params: DeserializeOwned,
429    {
430        let prev_handler = self.notification_handlers.lock().insert(
431            method,
432            Box::new(move |_, params, cx| {
433                if let Some(params) = serde_json::from_str(params).log_err() {
434                    f(params, cx);
435                }
436            }),
437        );
438        assert!(
439            prev_handler.is_none(),
440            "registered multiple handlers for the same LSP method"
441        );
442        Subscription {
443            method,
444            notification_handlers: self.notification_handlers.clone(),
445        }
446    }
447
448    #[must_use]
449    pub fn on_custom_request<Params, Res, Fut, F>(
450        &self,
451        method: &'static str,
452        mut f: F,
453    ) -> Subscription
454    where
455        F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
456        Fut: 'static + Future<Output = Result<Res>>,
457        Params: DeserializeOwned + Send + 'static,
458        Res: Serialize,
459    {
460        let outbound_tx = self.outbound_tx.clone();
461        let prev_handler = self.notification_handlers.lock().insert(
462            method,
463            Box::new(move |id, params, cx| {
464                if let Some(id) = id {
465                    match serde_json::from_str(params) {
466                        Ok(params) => {
467                            let response = f(params, cx.clone());
468                            cx.foreground()
469                                .spawn({
470                                    let outbound_tx = outbound_tx.clone();
471                                    async move {
472                                        let response = match response.await {
473                                            Ok(result) => Response {
474                                                jsonrpc: JSON_RPC_VERSION,
475                                                id,
476                                                result: Some(result),
477                                                error: None,
478                                            },
479                                            Err(error) => Response {
480                                                jsonrpc: JSON_RPC_VERSION,
481                                                id,
482                                                result: None,
483                                                error: Some(Error {
484                                                    message: error.to_string(),
485                                                }),
486                                            },
487                                        };
488                                        if let Some(response) =
489                                            serde_json::to_vec(&response).log_err()
490                                        {
491                                            outbound_tx.try_send(response).ok();
492                                        }
493                                    }
494                                })
495                                .detach();
496                        }
497                        Err(error) => {
498                            log::error!(
499                                "error deserializing {} request: {:?}, message: {:?}",
500                                method,
501                                error,
502                                params
503                            );
504                            let response = AnyResponse {
505                                jsonrpc: JSON_RPC_VERSION,
506                                id,
507                                result: None,
508                                error: Some(Error {
509                                    message: error.to_string(),
510                                }),
511                            };
512                            if let Some(response) = serde_json::to_vec(&response).log_err() {
513                                outbound_tx.try_send(response).ok();
514                            }
515                        }
516                    }
517                }
518            }),
519        );
520        assert!(
521            prev_handler.is_none(),
522            "registered multiple handlers for the same LSP method"
523        );
524        Subscription {
525            method,
526            notification_handlers: self.notification_handlers.clone(),
527        }
528    }
529
530    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
531        &self.name
532    }
533
534    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
535        &self.capabilities
536    }
537
538    pub fn server_id(&self) -> usize {
539        self.server_id
540    }
541
542    pub fn root_path(&self) -> &PathBuf {
543        &self.root_path
544    }
545
546    pub fn request<T: request::Request>(
547        &self,
548        params: T::Params,
549    ) -> impl Future<Output = Result<T::Result>>
550    where
551        T::Result: 'static + Send,
552    {
553        Self::request_internal::<T>(
554            &self.next_id,
555            &self.response_handlers,
556            &self.outbound_tx,
557            params,
558        )
559    }
560
561    fn request_internal<T: request::Request>(
562        next_id: &AtomicUsize,
563        response_handlers: &Mutex<Option<HashMap<usize, ResponseHandler>>>,
564        outbound_tx: &channel::Sender<Vec<u8>>,
565        params: T::Params,
566    ) -> impl 'static + Future<Output = Result<T::Result>>
567    where
568        T::Result: 'static + Send,
569    {
570        let id = next_id.fetch_add(1, SeqCst);
571        let message = serde_json::to_vec(&Request {
572            jsonrpc: JSON_RPC_VERSION,
573            id,
574            method: T::METHOD,
575            params,
576        })
577        .unwrap();
578
579        let (tx, rx) = oneshot::channel();
580        let handle_response = response_handlers
581            .lock()
582            .as_mut()
583            .ok_or_else(|| anyhow!("server shut down"))
584            .map(|handlers| {
585                handlers.insert(
586                    id,
587                    Box::new(move |result| {
588                        let response = match result {
589                            Ok(response) => serde_json::from_str(response)
590                                .context("failed to deserialize response"),
591                            Err(error) => Err(anyhow!("{}", error.message)),
592                        };
593                        let _ = tx.send(response);
594                    }),
595                );
596            });
597
598        let send = outbound_tx
599            .try_send(message)
600            .context("failed to write to language server's stdin");
601
602        async move {
603            handle_response?;
604            send?;
605            rx.await?
606        }
607    }
608
609    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
610        Self::notify_internal::<T>(&self.outbound_tx, params)
611    }
612
613    fn notify_internal<T: notification::Notification>(
614        outbound_tx: &channel::Sender<Vec<u8>>,
615        params: T::Params,
616    ) -> Result<()> {
617        let message = serde_json::to_vec(&Notification {
618            jsonrpc: JSON_RPC_VERSION,
619            method: T::METHOD,
620            params,
621        })
622        .unwrap();
623        outbound_tx.try_send(message)?;
624        Ok(())
625    }
626}
627
628impl Drop for LanguageServer {
629    fn drop(&mut self) {
630        if let Some(shutdown) = self.shutdown() {
631            self.executor.spawn(shutdown).detach();
632        }
633    }
634}
635
636impl Subscription {
637    pub fn detach(mut self) {
638        self.method = "";
639    }
640}
641
642impl Drop for Subscription {
643    fn drop(&mut self) {
644        self.notification_handlers.lock().remove(self.method);
645    }
646}
647
648#[cfg(any(test, feature = "test-support"))]
649#[derive(Clone)]
650pub struct FakeLanguageServer {
651    pub server: Arc<LanguageServer>,
652    notifications_rx: channel::Receiver<(String, String)>,
653}
654
655#[cfg(any(test, feature = "test-support"))]
656impl LanguageServer {
657    pub fn full_capabilities() -> ServerCapabilities {
658        ServerCapabilities {
659            document_highlight_provider: Some(OneOf::Left(true)),
660            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
661            document_formatting_provider: Some(OneOf::Left(true)),
662            document_range_formatting_provider: Some(OneOf::Left(true)),
663            ..Default::default()
664        }
665    }
666
667    pub fn fake(
668        name: String,
669        capabilities: ServerCapabilities,
670        cx: AsyncAppContext,
671    ) -> (Self, FakeLanguageServer) {
672        let (stdin_writer, stdin_reader) = async_pipe::pipe();
673        let (stdout_writer, stdout_reader) = async_pipe::pipe();
674        let (notifications_tx, notifications_rx) = channel::unbounded();
675
676        let server = Self::new_internal(
677            0,
678            stdin_writer,
679            stdout_reader,
680            None,
681            Path::new("/"),
682            cx.clone(),
683            |_| {},
684        );
685        let fake = FakeLanguageServer {
686            server: Arc::new(Self::new_internal(
687                0,
688                stdout_writer,
689                stdin_reader,
690                None,
691                Path::new("/"),
692                cx,
693                move |msg| {
694                    notifications_tx
695                        .try_send((msg.method.to_string(), msg.params.get().to_string()))
696                        .ok();
697                },
698            )),
699            notifications_rx,
700        };
701        fake.handle_request::<request::Initialize, _, _>({
702            let capabilities = capabilities;
703            move |_, _| {
704                let capabilities = capabilities.clone();
705                let name = name.clone();
706                async move {
707                    Ok(InitializeResult {
708                        capabilities,
709                        server_info: Some(ServerInfo {
710                            name,
711                            ..Default::default()
712                        }),
713                    })
714                }
715            }
716        });
717
718        (server, fake)
719    }
720}
721
722#[cfg(any(test, feature = "test-support"))]
723impl FakeLanguageServer {
724    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
725        self.server.notify::<T>(params).ok();
726    }
727
728    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
729    where
730        T: request::Request,
731        T::Result: 'static + Send,
732    {
733        self.server.request::<T>(params).await
734    }
735
736    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
737        self.try_receive_notification::<T>().await.unwrap()
738    }
739
740    pub async fn try_receive_notification<T: notification::Notification>(
741        &mut self,
742    ) -> Option<T::Params> {
743        use futures::StreamExt as _;
744
745        loop {
746            let (method, params) = self.notifications_rx.next().await?;
747            if method == T::METHOD {
748                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
749            } else {
750                log::info!("skipping message in fake language server {:?}", params);
751            }
752        }
753    }
754
755    pub fn handle_request<T, F, Fut>(
756        &self,
757        mut handler: F,
758    ) -> futures::channel::mpsc::UnboundedReceiver<()>
759    where
760        T: 'static + request::Request,
761        T::Params: 'static + Send,
762        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
763        Fut: 'static + Send + Future<Output = Result<T::Result>>,
764    {
765        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
766        self.server.remove_request_handler::<T>();
767        self.server
768            .on_request::<T, _, _>(move |params, cx| {
769                let result = handler(params, cx.clone());
770                let responded_tx = responded_tx.clone();
771                async move {
772                    cx.background().simulate_random_delay().await;
773                    let result = result.await;
774                    responded_tx.unbounded_send(()).ok();
775                    result
776                }
777            })
778            .detach();
779        responded_rx
780    }
781
782    pub fn remove_request_handler<T>(&mut self)
783    where
784        T: 'static + request::Request,
785    {
786        self.server.remove_request_handler::<T>();
787    }
788
789    pub async fn start_progress(&self, token: impl Into<String>) {
790        let token = token.into();
791        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
792            token: NumberOrString::String(token.clone()),
793        })
794        .await
795        .unwrap();
796        self.notify::<notification::Progress>(ProgressParams {
797            token: NumberOrString::String(token),
798            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
799        });
800    }
801
802    pub fn end_progress(&self, token: impl Into<String>) {
803        self.notify::<notification::Progress>(ProgressParams {
804            token: NumberOrString::String(token.into()),
805            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
806        });
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813    use gpui::TestAppContext;
814
815    #[ctor::ctor]
816    fn init_logger() {
817        if std::env::var("RUST_LOG").is_ok() {
818            env_logger::init();
819        }
820    }
821
822    #[gpui::test]
823    async fn test_fake(cx: &mut TestAppContext) {
824        let (server, mut fake) =
825            LanguageServer::fake("the-lsp".to_string(), Default::default(), cx.to_async());
826
827        let (message_tx, message_rx) = channel::unbounded();
828        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
829        server
830            .on_notification::<notification::ShowMessage, _>(move |params, _| {
831                message_tx.try_send(params).unwrap()
832            })
833            .detach();
834        server
835            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
836                diagnostics_tx.try_send(params).unwrap()
837            })
838            .detach();
839
840        let server = server.initialize(None).await.unwrap();
841        server
842            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
843                text_document: TextDocumentItem::new(
844                    Url::from_str("file://a/b").unwrap(),
845                    "rust".to_string(),
846                    0,
847                    "".to_string(),
848                ),
849            })
850            .unwrap();
851        assert_eq!(
852            fake.receive_notification::<notification::DidOpenTextDocument>()
853                .await
854                .text_document
855                .uri
856                .as_str(),
857            "file://a/b"
858        );
859
860        fake.notify::<notification::ShowMessage>(ShowMessageParams {
861            typ: MessageType::ERROR,
862            message: "ok".to_string(),
863        });
864        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
865            uri: Url::from_str("file://b/c").unwrap(),
866            version: Some(5),
867            diagnostics: vec![],
868        });
869        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
870        assert_eq!(
871            diagnostics_rx.recv().await.unwrap().uri.as_str(),
872            "file://b/c"
873        );
874
875        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
876
877        drop(server);
878        fake.receive_notification::<notification::Exit>().await;
879    }
880}