lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream};
  7use serde::{de::DeserializeOwned, Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    path::PathBuf,
 18    str::FromStr,
 19    sync::{
 20        atomic::{AtomicUsize, Ordering::SeqCst},
 21        Arc,
 22    },
 23};
 24use std::{path::Path, process::Stdio};
 25use util::TryFutureExt;
 26
 27pub use lsp_types::*;
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler =
 33    Box<dyn Send + Sync + FnMut(Option<usize>, &str, &mut channel::Sender<Vec<u8>>) -> Result<()>>;
 34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 35
 36pub struct LanguageServer {
 37    server_id: usize,
 38    next_id: AtomicUsize,
 39    outbound_tx: channel::Sender<Vec<u8>>,
 40    name: String,
 41    capabilities: ServerCapabilities,
 42    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 43    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 44    executor: Arc<executor::Background>,
 45    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 46    output_done_rx: Mutex<Option<barrier::Receiver>>,
 47    root_path: PathBuf,
 48    options: Option<Value>,
 49}
 50
 51pub struct Subscription {
 52    method: &'static str,
 53    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 54}
 55
 56#[derive(Serialize, Deserialize)]
 57struct Request<'a, T> {
 58    jsonrpc: &'a str,
 59    id: usize,
 60    method: &'a str,
 61    params: T,
 62}
 63
 64#[cfg(any(test, feature = "test-support"))]
 65#[derive(Deserialize)]
 66struct AnyRequest<'a> {
 67    id: usize,
 68    #[serde(borrow)]
 69    jsonrpc: &'a str,
 70    #[serde(borrow)]
 71    method: &'a str,
 72    #[serde(borrow)]
 73    params: &'a RawValue,
 74}
 75
 76#[derive(Serialize, Deserialize)]
 77struct AnyResponse<'a> {
 78    id: usize,
 79    #[serde(default)]
 80    error: Option<Error>,
 81    #[serde(borrow)]
 82    result: Option<&'a RawValue>,
 83}
 84
 85#[derive(Serialize)]
 86struct Response<T> {
 87    id: usize,
 88    result: T,
 89}
 90
 91#[derive(Serialize, Deserialize)]
 92struct Notification<'a, T> {
 93    #[serde(borrow)]
 94    jsonrpc: &'a str,
 95    #[serde(borrow)]
 96    method: &'a str,
 97    params: T,
 98}
 99
100#[derive(Deserialize)]
101struct AnyNotification<'a> {
102    #[serde(default)]
103    id: Option<usize>,
104    #[serde(borrow)]
105    method: &'a str,
106    #[serde(borrow)]
107    params: &'a RawValue,
108}
109
110#[derive(Debug, Serialize, Deserialize)]
111struct Error {
112    message: String,
113}
114
115impl LanguageServer {
116    pub fn new(
117        server_id: usize,
118        binary_path: &Path,
119        args: &[&str],
120        root_path: &Path,
121        options: Option<Value>,
122        background: Arc<executor::Background>,
123    ) -> Result<Self> {
124        let working_dir = if root_path.is_dir() {
125            root_path
126        } else {
127            root_path.parent().unwrap_or(Path::new("/"))
128        };
129        let mut server = Command::new(binary_path)
130            .current_dir(working_dir)
131            .args(args)
132            .stdin(Stdio::piped())
133            .stdout(Stdio::piped())
134            .stderr(Stdio::inherit())
135            .spawn()?;
136        let stdin = server.stdin.take().unwrap();
137        let stdout = server.stdout.take().unwrap();
138        let mut server =
139            Self::new_internal(server_id, stdin, stdout, root_path, options, background);
140        if let Some(name) = binary_path.file_name() {
141            server.name = name.to_string_lossy().to_string();
142        }
143        Ok(server)
144    }
145
146    fn new_internal<Stdin, Stdout>(
147        server_id: usize,
148        stdin: Stdin,
149        stdout: Stdout,
150        root_path: &Path,
151        options: Option<Value>,
152        executor: Arc<executor::Background>,
153    ) -> Self
154    where
155        Stdin: AsyncWrite + Unpin + Send + 'static,
156        Stdout: AsyncRead + Unpin + Send + 'static,
157    {
158        let mut stdin = BufWriter::new(stdin);
159        let mut stdout = BufReader::new(stdout);
160        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
161        let notification_handlers =
162            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
163        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
164        let input_task = executor.spawn(
165            {
166                let notification_handlers = notification_handlers.clone();
167                let response_handlers = response_handlers.clone();
168                let mut outbound_tx = outbound_tx.clone();
169                async move {
170                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
171                    let mut buffer = Vec::new();
172                    loop {
173                        buffer.clear();
174                        stdout.read_until(b'\n', &mut buffer).await?;
175                        stdout.read_until(b'\n', &mut buffer).await?;
176                        let message_len: usize = std::str::from_utf8(&buffer)?
177                            .strip_prefix(CONTENT_LEN_HEADER)
178                            .ok_or_else(|| anyhow!("invalid header"))?
179                            .trim_end()
180                            .parse()?;
181
182                        buffer.resize(message_len, 0);
183                        stdout.read_exact(&mut buffer).await?;
184
185                        if let Ok(AnyNotification { id, method, params }) =
186                            serde_json::from_slice(&buffer)
187                        {
188                            if let Some(handler) = notification_handlers.write().get_mut(method) {
189                                if let Err(e) = handler(id, params.get(), &mut outbound_tx) {
190                                    log::error!("error handling {} message: {:?}", method, e);
191                                }
192                            } else {
193                                log::info!(
194                                    "unhandled notification {}:\n{}",
195                                    method,
196                                    serde_json::to_string_pretty(
197                                        &Value::from_str(params.get()).unwrap()
198                                    )
199                                    .unwrap()
200                                );
201                            }
202                        } else if let Ok(AnyResponse { id, error, result }) =
203                            serde_json::from_slice(&buffer)
204                        {
205                            if let Some(handler) = response_handlers.lock().remove(&id) {
206                                if let Some(error) = error {
207                                    handler(Err(error));
208                                } else if let Some(result) = result {
209                                    handler(Ok(result.get()));
210                                } else {
211                                    handler(Ok("null"));
212                                }
213                            }
214                        } else {
215                            return Err(anyhow!(
216                                "failed to deserialize message:\n{}",
217                                std::str::from_utf8(&buffer)?
218                            ));
219                        }
220                    }
221                }
222            }
223            .log_err(),
224        );
225        let (output_done_tx, output_done_rx) = barrier::channel();
226        let output_task = executor.spawn({
227            let response_handlers = response_handlers.clone();
228            async move {
229                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
230                let mut content_len_buffer = Vec::new();
231                while let Ok(message) = outbound_rx.recv().await {
232                    content_len_buffer.clear();
233                    write!(content_len_buffer, "{}", message.len()).unwrap();
234                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
235                    stdin.write_all(&content_len_buffer).await?;
236                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
237                    stdin.write_all(&message).await?;
238                    stdin.flush().await?;
239                }
240                drop(output_done_tx);
241                Ok(())
242            }
243            .log_err()
244        });
245
246        Self {
247            server_id,
248            notification_handlers,
249            response_handlers,
250            name: Default::default(),
251            capabilities: Default::default(),
252            next_id: Default::default(),
253            outbound_tx,
254            executor: executor.clone(),
255            io_tasks: Mutex::new(Some((input_task, output_task))),
256            output_done_rx: Mutex::new(Some(output_done_rx)),
257            root_path: root_path.to_path_buf(),
258            options,
259        }
260    }
261
262    pub async fn initialize(mut self) -> Result<Arc<Self>> {
263        let options = self.options.take();
264        let mut this = Arc::new(self);
265        let root_uri = Url::from_file_path(&this.root_path).unwrap();
266        #[allow(deprecated)]
267        let params = InitializeParams {
268            process_id: Default::default(),
269            root_path: Default::default(),
270            root_uri: Some(root_uri),
271            initialization_options: options,
272            capabilities: ClientCapabilities {
273                workspace: Some(WorkspaceClientCapabilities {
274                    configuration: Some(true),
275                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
276                        dynamic_registration: Some(true),
277                    }),
278                    ..Default::default()
279                }),
280                text_document: Some(TextDocumentClientCapabilities {
281                    definition: Some(GotoCapability {
282                        link_support: Some(true),
283                        ..Default::default()
284                    }),
285                    code_action: Some(CodeActionClientCapabilities {
286                        code_action_literal_support: Some(CodeActionLiteralSupport {
287                            code_action_kind: CodeActionKindLiteralSupport {
288                                value_set: vec![
289                                    CodeActionKind::REFACTOR.as_str().into(),
290                                    CodeActionKind::QUICKFIX.as_str().into(),
291                                ],
292                            },
293                        }),
294                        data_support: Some(true),
295                        resolve_support: Some(CodeActionCapabilityResolveSupport {
296                            properties: vec!["edit".to_string()],
297                        }),
298                        ..Default::default()
299                    }),
300                    completion: Some(CompletionClientCapabilities {
301                        completion_item: Some(CompletionItemCapability {
302                            snippet_support: Some(true),
303                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
304                                properties: vec!["additionalTextEdits".to_string()],
305                            }),
306                            ..Default::default()
307                        }),
308                        ..Default::default()
309                    }),
310                    ..Default::default()
311                }),
312                experimental: Some(json!({
313                    "serverStatusNotification": true,
314                })),
315                window: Some(WindowClientCapabilities {
316                    work_done_progress: Some(true),
317                    ..Default::default()
318                }),
319                ..Default::default()
320            },
321            trace: Default::default(),
322            workspace_folders: Default::default(),
323            client_info: Default::default(),
324            locale: Default::default(),
325        };
326
327        let response = this.request::<request::Initialize>(params).await?;
328        {
329            let this = Arc::get_mut(&mut this).unwrap();
330            if let Some(info) = response.server_info {
331                this.name = info.name;
332            }
333            this.capabilities = response.capabilities;
334        }
335        this.notify::<notification::Initialized>(InitializedParams {})?;
336        Ok(this)
337    }
338
339    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
340        if let Some(tasks) = self.io_tasks.lock().take() {
341            let response_handlers = self.response_handlers.clone();
342            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
343            let outbound_tx = self.outbound_tx.clone();
344            let mut output_done = self.output_done_rx.lock().take().unwrap();
345            let shutdown_request = Self::request_internal::<request::Shutdown>(
346                &next_id,
347                &response_handlers,
348                &outbound_tx,
349                (),
350            );
351            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
352            outbound_tx.close();
353            Some(
354                async move {
355                    log::debug!("language server shutdown started");
356                    shutdown_request.await?;
357                    response_handlers.lock().clear();
358                    exit?;
359                    output_done.recv().await;
360                    log::debug!("language server shutdown finished");
361                    drop(tasks);
362                    Ok(())
363                }
364                .log_err(),
365            )
366        } else {
367            None
368        }
369    }
370
371    pub fn on_notification<T, F>(&mut self, f: F) -> Subscription
372    where
373        T: notification::Notification,
374        F: 'static + Send + Sync + FnMut(T::Params),
375    {
376        self.on_custom_notification(T::METHOD, f)
377    }
378
379    pub fn on_request<T, F>(&mut self, f: F) -> Subscription
380    where
381        T: request::Request,
382        F: 'static + Send + Sync + FnMut(T::Params) -> Result<T::Result>,
383    {
384        self.on_custom_request(T::METHOD, f)
385    }
386
387    pub fn on_custom_notification<Params, F>(
388        &mut self,
389        method: &'static str,
390        mut f: F,
391    ) -> Subscription
392    where
393        F: 'static + Send + Sync + FnMut(Params),
394        Params: DeserializeOwned,
395    {
396        let prev_handler = self.notification_handlers.write().insert(
397            method,
398            Box::new(move |_, params, _| {
399                let params = serde_json::from_str(params)?;
400                f(params);
401                Ok(())
402            }),
403        );
404        assert!(
405            prev_handler.is_none(),
406            "registered multiple handlers for the same LSP method"
407        );
408        Subscription {
409            method,
410            notification_handlers: self.notification_handlers.clone(),
411        }
412    }
413
414    pub fn on_custom_request<Params, Res, F>(
415        &mut self,
416        method: &'static str,
417        mut f: F,
418    ) -> Subscription
419    where
420        F: 'static + Send + Sync + FnMut(Params) -> Result<Res>,
421        Params: DeserializeOwned,
422        Res: Serialize,
423    {
424        let prev_handler = self.notification_handlers.write().insert(
425            method,
426            Box::new(move |id, params, tx| {
427                if let Some(id) = id {
428                    let params = serde_json::from_str(params)?;
429                    let result = f(params)?;
430                    let response = serde_json::to_vec(&Response { id, result })?;
431                    tx.try_send(response)?;
432                }
433                Ok(())
434            }),
435        );
436        assert!(
437            prev_handler.is_none(),
438            "registered multiple handlers for the same LSP method"
439        );
440        Subscription {
441            method,
442            notification_handlers: self.notification_handlers.clone(),
443        }
444    }
445
446    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
447        &self.name
448    }
449
450    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
451        &self.capabilities
452    }
453
454    pub fn server_id(&self) -> usize {
455        self.server_id
456    }
457
458    pub fn request<T: request::Request>(
459        self: &Arc<Self>,
460        params: T::Params,
461    ) -> impl Future<Output = Result<T::Result>>
462    where
463        T::Result: 'static + Send,
464    {
465        Self::request_internal::<T>(
466            &self.next_id,
467            &self.response_handlers,
468            &self.outbound_tx,
469            params,
470        )
471    }
472
473    fn request_internal<T: request::Request>(
474        next_id: &AtomicUsize,
475        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
476        outbound_tx: &channel::Sender<Vec<u8>>,
477        params: T::Params,
478    ) -> impl 'static + Future<Output = Result<T::Result>>
479    where
480        T::Result: 'static + Send,
481    {
482        let id = next_id.fetch_add(1, SeqCst);
483        let message = serde_json::to_vec(&Request {
484            jsonrpc: JSON_RPC_VERSION,
485            id,
486            method: T::METHOD,
487            params,
488        })
489        .unwrap();
490
491        let send = outbound_tx
492            .try_send(message)
493            .context("failed to write to language server's stdin");
494
495        let (tx, rx) = oneshot::channel();
496        response_handlers.lock().insert(
497            id,
498            Box::new(move |result| {
499                let response = match result {
500                    Ok(response) => {
501                        serde_json::from_str(response).context("failed to deserialize response")
502                    }
503                    Err(error) => Err(anyhow!("{}", error.message)),
504                };
505                let _ = tx.send(response);
506            }),
507        );
508
509        async move {
510            send?;
511            rx.await?
512        }
513    }
514
515    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
516        Self::notify_internal::<T>(&self.outbound_tx, params)
517    }
518
519    fn notify_internal<T: notification::Notification>(
520        outbound_tx: &channel::Sender<Vec<u8>>,
521        params: T::Params,
522    ) -> Result<()> {
523        let message = serde_json::to_vec(&Notification {
524            jsonrpc: JSON_RPC_VERSION,
525            method: T::METHOD,
526            params,
527        })
528        .unwrap();
529        outbound_tx.try_send(message)?;
530        Ok(())
531    }
532}
533
534impl Drop for LanguageServer {
535    fn drop(&mut self) {
536        if let Some(shutdown) = self.shutdown() {
537            self.executor.spawn(shutdown).detach();
538        }
539    }
540}
541
542impl Subscription {
543    pub fn detach(mut self) {
544        self.method = "";
545    }
546}
547
548impl Drop for Subscription {
549    fn drop(&mut self) {
550        self.notification_handlers.write().remove(self.method);
551    }
552}
553
554#[cfg(any(test, feature = "test-support"))]
555pub struct FakeLanguageServer {
556    handlers: FakeLanguageServerHandlers,
557    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
558    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
559    _input_task: Task<Result<()>>,
560    _output_task: Task<Result<()>>,
561}
562
563#[cfg(any(test, feature = "test-support"))]
564type FakeLanguageServerHandlers = Arc<
565    Mutex<
566        HashMap<
567            &'static str,
568            Box<
569                dyn Send
570                    + FnMut(
571                        usize,
572                        &[u8],
573                        gpui::AsyncAppContext,
574                    ) -> futures::future::BoxFuture<'static, Vec<u8>>,
575            >,
576        >,
577    >,
578>;
579
580#[cfg(any(test, feature = "test-support"))]
581impl LanguageServer {
582    pub fn full_capabilities() -> ServerCapabilities {
583        ServerCapabilities {
584            document_highlight_provider: Some(OneOf::Left(true)),
585            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
586            document_formatting_provider: Some(OneOf::Left(true)),
587            document_range_formatting_provider: Some(OneOf::Left(true)),
588            ..Default::default()
589        }
590    }
591
592    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
593        Self::fake_with_capabilities(Self::full_capabilities(), cx)
594    }
595
596    pub fn fake_with_capabilities(
597        capabilities: ServerCapabilities,
598        cx: &mut gpui::MutableAppContext,
599    ) -> (Self, FakeLanguageServer) {
600        let (stdin_writer, stdin_reader) = async_pipe::pipe();
601        let (stdout_writer, stdout_reader) = async_pipe::pipe();
602
603        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
604        fake.handle_request::<request::Initialize, _, _>({
605            let capabilities = capabilities.clone();
606            move |_, _| {
607                let capabilities = capabilities.clone();
608                async move {
609                    InitializeResult {
610                        capabilities,
611                        ..Default::default()
612                    }
613                }
614            }
615        });
616
617        let executor = cx.background().clone();
618        let server = Self::new_internal(
619            0,
620            stdin_writer,
621            stdout_reader,
622            Path::new("/"),
623            None,
624            executor,
625        );
626        (server, fake)
627    }
628}
629
630#[cfg(any(test, feature = "test-support"))]
631impl FakeLanguageServer {
632    fn new(
633        stdin: async_pipe::PipeReader,
634        stdout: async_pipe::PipeWriter,
635        cx: &mut gpui::MutableAppContext,
636    ) -> Self {
637        use futures::StreamExt as _;
638
639        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
640        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
641        let handlers = FakeLanguageServerHandlers::default();
642
643        let input_task = cx.spawn(|cx| {
644            let handlers = handlers.clone();
645            let outgoing_tx = outgoing_tx.clone();
646            async move {
647                let mut buffer = Vec::new();
648                let mut stdin = smol::io::BufReader::new(stdin);
649                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
650                    cx.background().simulate_random_delay().await;
651
652                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
653                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
654
655                        let response;
656                        if let Some(handler) = handlers.lock().get_mut(request.method) {
657                            response =
658                                handler(request.id, request.params.get().as_bytes(), cx.clone())
659                                    .await;
660                            log::debug!("handled lsp request. method:{}", request.method);
661                        } else {
662                            response = serde_json::to_vec(&AnyResponse {
663                                id: request.id,
664                                error: Some(Error {
665                                    message: "no handler".to_string(),
666                                }),
667                                result: None,
668                            })
669                            .unwrap();
670                            log::debug!("unhandled lsp request. method:{}", request.method);
671                        }
672                        outgoing_tx.unbounded_send(response)?;
673                    } else {
674                        incoming_tx.unbounded_send(buffer.clone())?;
675                    }
676                }
677                Ok::<_, anyhow::Error>(())
678            }
679        });
680
681        let output_task = cx.background().spawn(async move {
682            let mut stdout = smol::io::BufWriter::new(stdout);
683            while let Some(message) = outgoing_rx.next().await {
684                stdout.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
685                stdout
686                    .write_all((format!("{}", message.len())).as_bytes())
687                    .await?;
688                stdout.write_all("\r\n\r\n".as_bytes()).await?;
689                stdout.write_all(&message).await?;
690                stdout.flush().await?;
691            }
692            Ok(())
693        });
694
695        Self {
696            outgoing_tx,
697            incoming_rx,
698            handlers,
699            _input_task: input_task,
700            _output_task: output_task,
701        }
702    }
703
704    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
705        let message = serde_json::to_vec(&Notification {
706            jsonrpc: JSON_RPC_VERSION,
707            method: T::METHOD,
708            params,
709        })
710        .unwrap();
711        self.outgoing_tx.unbounded_send(message).unwrap();
712    }
713
714    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
715        use futures::StreamExt as _;
716
717        loop {
718            let bytes = self.incoming_rx.next().await.unwrap();
719            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
720                assert_eq!(notification.method, T::METHOD);
721                return notification.params;
722            } else {
723                log::info!(
724                    "skipping message in fake language server {:?}",
725                    std::str::from_utf8(&bytes)
726                );
727            }
728        }
729    }
730
731    pub fn handle_request<T, F, Fut>(
732        &mut self,
733        mut handler: F,
734    ) -> futures::channel::mpsc::UnboundedReceiver<()>
735    where
736        T: 'static + request::Request,
737        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
738        Fut: 'static + Send + Future<Output = T::Result>,
739    {
740        use futures::FutureExt as _;
741
742        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
743        self.handlers.lock().insert(
744            T::METHOD,
745            Box::new(move |id, params, cx| {
746                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
747                let responded_tx = responded_tx.clone();
748                async move {
749                    let result = result.await;
750                    let result = serde_json::to_string(&result).unwrap();
751                    let result = serde_json::from_str::<&RawValue>(&result).unwrap();
752                    let response = AnyResponse {
753                        id,
754                        error: None,
755                        result: Some(result),
756                    };
757                    responded_tx.unbounded_send(()).ok();
758                    serde_json::to_vec(&response).unwrap()
759                }
760                .boxed()
761            }),
762        );
763        responded_rx
764    }
765
766    pub fn remove_request_handler<T>(&mut self)
767    where
768        T: 'static + request::Request,
769    {
770        self.handlers.lock().remove(T::METHOD);
771    }
772
773    pub async fn start_progress(&mut self, token: impl Into<String>) {
774        self.notify::<notification::Progress>(ProgressParams {
775            token: NumberOrString::String(token.into()),
776            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
777        });
778    }
779
780    pub async fn end_progress(&mut self, token: impl Into<String>) {
781        self.notify::<notification::Progress>(ProgressParams {
782            token: NumberOrString::String(token.into()),
783            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
784        });
785    }
786
787    async fn receive(
788        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
789        buffer: &mut Vec<u8>,
790    ) -> Result<()> {
791        buffer.clear();
792        stdin.read_until(b'\n', buffer).await?;
793        stdin.read_until(b'\n', buffer).await?;
794        let message_len: usize = std::str::from_utf8(buffer)
795            .unwrap()
796            .strip_prefix(CONTENT_LEN_HEADER)
797            .ok_or_else(|| anyhow!("invalid content length header"))?
798            .trim_end()
799            .parse()
800            .unwrap();
801        buffer.resize(message_len, 0);
802        stdin.read_exact(buffer).await?;
803        Ok(())
804    }
805}
806
807struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
808
809impl Drop for ClearResponseHandlers {
810    fn drop(&mut self) {
811        self.0.lock().clear();
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use super::*;
818    use gpui::TestAppContext;
819
820    #[ctor::ctor]
821    fn init_logger() {
822        if std::env::var("RUST_LOG").is_ok() {
823            env_logger::init();
824        }
825    }
826
827    #[gpui::test]
828    async fn test_fake(cx: &mut TestAppContext) {
829        let (mut server, mut fake) = cx.update(LanguageServer::fake);
830
831        let (message_tx, message_rx) = channel::unbounded();
832        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
833        server
834            .on_notification::<notification::ShowMessage, _>(move |params| {
835                message_tx.try_send(params).unwrap()
836            })
837            .detach();
838        server
839            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
840                diagnostics_tx.try_send(params).unwrap()
841            })
842            .detach();
843
844        let server = server.initialize().await.unwrap();
845        server
846            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
847                text_document: TextDocumentItem::new(
848                    Url::from_str("file://a/b").unwrap(),
849                    "rust".to_string(),
850                    0,
851                    "".to_string(),
852                ),
853            })
854            .unwrap();
855        assert_eq!(
856            fake.receive_notification::<notification::DidOpenTextDocument>()
857                .await
858                .text_document
859                .uri
860                .as_str(),
861            "file://a/b"
862        );
863
864        fake.notify::<notification::ShowMessage>(ShowMessageParams {
865            typ: MessageType::ERROR,
866            message: "ok".to_string(),
867        });
868        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
869            uri: Url::from_str("file://b/c").unwrap(),
870            version: Some(5),
871            diagnostics: vec![],
872        });
873        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
874        assert_eq!(
875            diagnostics_rx.recv().await.unwrap().uri.as_str(),
876            "file://b/c"
877        );
878
879        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move {});
880
881        drop(server);
882        fake.receive_notification::<notification::Exit>().await;
883    }
884}