lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream};
  7use serde::{de::DeserializeOwned, Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    path::PathBuf,
 18    str::FromStr,
 19    sync::{
 20        atomic::{AtomicUsize, Ordering::SeqCst},
 21        Arc,
 22    },
 23};
 24use std::{path::Path, process::Stdio};
 25use util::TryFutureExt;
 26
 27pub use lsp_types::*;
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler =
 33    Box<dyn Send + Sync + FnMut(Option<usize>, &str, &mut channel::Sender<Vec<u8>>) -> Result<()>>;
 34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 35
 36pub struct LanguageServer {
 37    next_id: AtomicUsize,
 38    outbound_tx: channel::Sender<Vec<u8>>,
 39    name: String,
 40    capabilities: ServerCapabilities,
 41    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 42    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 43    executor: Arc<executor::Background>,
 44    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 45    output_done_rx: Mutex<Option<barrier::Receiver>>,
 46    root_path: PathBuf,
 47    options: Option<Value>,
 48}
 49
 50pub struct Subscription {
 51    method: &'static str,
 52    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 53}
 54
 55#[derive(Serialize, Deserialize)]
 56struct Request<'a, T> {
 57    jsonrpc: &'a str,
 58    id: usize,
 59    method: &'a str,
 60    params: T,
 61}
 62
 63#[cfg(any(test, feature = "test-support"))]
 64#[derive(Deserialize)]
 65struct AnyRequest<'a> {
 66    id: usize,
 67    #[serde(borrow)]
 68    jsonrpc: &'a str,
 69    #[serde(borrow)]
 70    method: &'a str,
 71    #[serde(borrow)]
 72    params: &'a RawValue,
 73}
 74
 75#[derive(Serialize, Deserialize)]
 76struct AnyResponse<'a> {
 77    id: usize,
 78    #[serde(default)]
 79    error: Option<Error>,
 80    #[serde(borrow)]
 81    result: Option<&'a RawValue>,
 82}
 83
 84#[derive(Serialize)]
 85struct Response<T> {
 86    id: usize,
 87    result: T,
 88}
 89
 90#[derive(Serialize, Deserialize)]
 91struct Notification<'a, T> {
 92    #[serde(borrow)]
 93    jsonrpc: &'a str,
 94    #[serde(borrow)]
 95    method: &'a str,
 96    params: T,
 97}
 98
 99#[derive(Deserialize)]
100struct AnyNotification<'a> {
101    #[serde(default)]
102    id: Option<usize>,
103    #[serde(borrow)]
104    method: &'a str,
105    #[serde(borrow)]
106    params: &'a RawValue,
107}
108
109#[derive(Debug, Serialize, Deserialize)]
110struct Error {
111    message: String,
112}
113
114impl LanguageServer {
115    pub fn new(
116        binary_path: &Path,
117        args: &[&str],
118        root_path: &Path,
119        options: Option<Value>,
120        background: Arc<executor::Background>,
121    ) -> Result<Self> {
122        let working_dir = if root_path.is_dir() {
123            root_path
124        } else {
125            root_path.parent().unwrap_or(Path::new("/"))
126        };
127        let mut server = Command::new(binary_path)
128            .current_dir(working_dir)
129            .args(args)
130            .stdin(Stdio::piped())
131            .stdout(Stdio::piped())
132            .stderr(Stdio::inherit())
133            .spawn()?;
134        let stdin = server.stdin.take().unwrap();
135        let stdout = server.stdout.take().unwrap();
136        let mut server = Self::new_internal(stdin, stdout, root_path, options, background);
137        if let Some(name) = binary_path.file_name() {
138            server.name = name.to_string_lossy().to_string();
139        }
140        Ok(server)
141    }
142
143    fn new_internal<Stdin, Stdout>(
144        stdin: Stdin,
145        stdout: Stdout,
146        root_path: &Path,
147        options: Option<Value>,
148        executor: Arc<executor::Background>,
149    ) -> Self
150    where
151        Stdin: AsyncWrite + Unpin + Send + 'static,
152        Stdout: AsyncRead + Unpin + Send + 'static,
153    {
154        let mut stdin = BufWriter::new(stdin);
155        let mut stdout = BufReader::new(stdout);
156        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
157        let notification_handlers =
158            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
159        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
160        let input_task = executor.spawn(
161            {
162                let notification_handlers = notification_handlers.clone();
163                let response_handlers = response_handlers.clone();
164                let mut outbound_tx = outbound_tx.clone();
165                async move {
166                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
167                    let mut buffer = Vec::new();
168                    loop {
169                        buffer.clear();
170                        stdout.read_until(b'\n', &mut buffer).await?;
171                        stdout.read_until(b'\n', &mut buffer).await?;
172                        let message_len: usize = std::str::from_utf8(&buffer)?
173                            .strip_prefix(CONTENT_LEN_HEADER)
174                            .ok_or_else(|| anyhow!("invalid header"))?
175                            .trim_end()
176                            .parse()?;
177
178                        buffer.resize(message_len, 0);
179                        stdout.read_exact(&mut buffer).await?;
180
181                        if let Ok(AnyNotification { id, method, params }) =
182                            serde_json::from_slice(&buffer)
183                        {
184                            if let Some(handler) = notification_handlers.write().get_mut(method) {
185                                if let Err(e) = handler(id, params.get(), &mut outbound_tx) {
186                                    log::error!("error handling {} message: {:?}", method, e);
187                                }
188                            } else {
189                                log::info!(
190                                    "unhandled notification {}:\n{}",
191                                    method,
192                                    serde_json::to_string_pretty(
193                                        &Value::from_str(params.get()).unwrap()
194                                    )
195                                    .unwrap()
196                                );
197                            }
198                        } else if let Ok(AnyResponse { id, error, result }) =
199                            serde_json::from_slice(&buffer)
200                        {
201                            if let Some(handler) = response_handlers.lock().remove(&id) {
202                                if let Some(error) = error {
203                                    handler(Err(error));
204                                } else if let Some(result) = result {
205                                    handler(Ok(result.get()));
206                                } else {
207                                    handler(Ok("null"));
208                                }
209                            }
210                        } else {
211                            return Err(anyhow!(
212                                "failed to deserialize message:\n{}",
213                                std::str::from_utf8(&buffer)?
214                            ));
215                        }
216                    }
217                }
218            }
219            .log_err(),
220        );
221        let (output_done_tx, output_done_rx) = barrier::channel();
222        let output_task = executor.spawn({
223            let response_handlers = response_handlers.clone();
224            async move {
225                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
226                let mut content_len_buffer = Vec::new();
227                while let Ok(message) = outbound_rx.recv().await {
228                    content_len_buffer.clear();
229                    write!(content_len_buffer, "{}", message.len()).unwrap();
230                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
231                    stdin.write_all(&content_len_buffer).await?;
232                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
233                    stdin.write_all(&message).await?;
234                    stdin.flush().await?;
235                }
236                drop(output_done_tx);
237                Ok(())
238            }
239            .log_err()
240        });
241
242        Self {
243            notification_handlers,
244            response_handlers,
245            name: Default::default(),
246            capabilities: Default::default(),
247            next_id: Default::default(),
248            outbound_tx,
249            executor: executor.clone(),
250            io_tasks: Mutex::new(Some((input_task, output_task))),
251            output_done_rx: Mutex::new(Some(output_done_rx)),
252            root_path: root_path.to_path_buf(),
253            options,
254        }
255    }
256
257    pub async fn initialize(mut self) -> Result<Arc<Self>> {
258        let options = self.options.take();
259        let mut this = Arc::new(self);
260        let root_uri = Url::from_file_path(&this.root_path).unwrap();
261        #[allow(deprecated)]
262        let params = InitializeParams {
263            process_id: Default::default(),
264            root_path: Default::default(),
265            root_uri: Some(root_uri),
266            initialization_options: options,
267            capabilities: ClientCapabilities {
268                workspace: Some(WorkspaceClientCapabilities {
269                    configuration: Some(true),
270                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
271                        dynamic_registration: Some(true),
272                    }),
273                    ..Default::default()
274                }),
275                text_document: Some(TextDocumentClientCapabilities {
276                    definition: Some(GotoCapability {
277                        link_support: Some(true),
278                        ..Default::default()
279                    }),
280                    code_action: Some(CodeActionClientCapabilities {
281                        code_action_literal_support: Some(CodeActionLiteralSupport {
282                            code_action_kind: CodeActionKindLiteralSupport {
283                                value_set: vec![
284                                    CodeActionKind::REFACTOR.as_str().into(),
285                                    CodeActionKind::QUICKFIX.as_str().into(),
286                                ],
287                            },
288                        }),
289                        data_support: Some(true),
290                        resolve_support: Some(CodeActionCapabilityResolveSupport {
291                            properties: vec!["edit".to_string()],
292                        }),
293                        ..Default::default()
294                    }),
295                    completion: Some(CompletionClientCapabilities {
296                        completion_item: Some(CompletionItemCapability {
297                            snippet_support: Some(true),
298                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
299                                properties: vec!["additionalTextEdits".to_string()],
300                            }),
301                            ..Default::default()
302                        }),
303                        ..Default::default()
304                    }),
305                    ..Default::default()
306                }),
307                experimental: Some(json!({
308                    "serverStatusNotification": true,
309                })),
310                window: Some(WindowClientCapabilities {
311                    work_done_progress: Some(true),
312                    ..Default::default()
313                }),
314                ..Default::default()
315            },
316            trace: Default::default(),
317            workspace_folders: Default::default(),
318            client_info: Default::default(),
319            locale: Default::default(),
320        };
321
322        let response = this.request::<request::Initialize>(params).await?;
323        {
324            let this = Arc::get_mut(&mut this).unwrap();
325            if let Some(info) = response.server_info {
326                this.name = info.name;
327            }
328            this.capabilities = response.capabilities;
329        }
330        this.notify::<notification::Initialized>(InitializedParams {})?;
331        Ok(this)
332    }
333
334    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
335        if let Some(tasks) = self.io_tasks.lock().take() {
336            let response_handlers = self.response_handlers.clone();
337            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
338            let outbound_tx = self.outbound_tx.clone();
339            let mut output_done = self.output_done_rx.lock().take().unwrap();
340            let shutdown_request = Self::request_internal::<request::Shutdown>(
341                &next_id,
342                &response_handlers,
343                &outbound_tx,
344                (),
345            );
346            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
347            outbound_tx.close();
348            Some(
349                async move {
350                    log::debug!("language server shutdown started");
351                    shutdown_request.await?;
352                    response_handlers.lock().clear();
353                    exit?;
354                    output_done.recv().await;
355                    log::debug!("language server shutdown finished");
356                    drop(tasks);
357                    Ok(())
358                }
359                .log_err(),
360            )
361        } else {
362            None
363        }
364    }
365
366    pub fn on_notification<T, F>(&mut self, f: F) -> Subscription
367    where
368        T: notification::Notification,
369        F: 'static + Send + Sync + FnMut(T::Params),
370    {
371        self.on_custom_notification(T::METHOD, f)
372    }
373
374    pub fn on_request<T, F>(&mut self, f: F) -> Subscription
375    where
376        T: request::Request,
377        F: 'static + Send + Sync + FnMut(T::Params) -> Result<T::Result>,
378    {
379        self.on_custom_request(T::METHOD, f)
380    }
381
382    pub fn on_custom_notification<Params, F>(
383        &mut self,
384        method: &'static str,
385        mut f: F,
386    ) -> Subscription
387    where
388        F: 'static + Send + Sync + FnMut(Params),
389        Params: DeserializeOwned,
390    {
391        let prev_handler = self.notification_handlers.write().insert(
392            method,
393            Box::new(move |_, params, _| {
394                let params = serde_json::from_str(params)?;
395                f(params);
396                Ok(())
397            }),
398        );
399        assert!(
400            prev_handler.is_none(),
401            "registered multiple handlers for the same LSP method"
402        );
403        Subscription {
404            method,
405            notification_handlers: self.notification_handlers.clone(),
406        }
407    }
408
409    pub fn on_custom_request<Params, Res, F>(
410        &mut self,
411        method: &'static str,
412        mut f: F,
413    ) -> Subscription
414    where
415        F: 'static + Send + Sync + FnMut(Params) -> Result<Res>,
416        Params: DeserializeOwned,
417        Res: Serialize,
418    {
419        let prev_handler = self.notification_handlers.write().insert(
420            method,
421            Box::new(move |id, params, tx| {
422                if let Some(id) = id {
423                    let params = serde_json::from_str(params)?;
424                    let result = f(params)?;
425                    let response = serde_json::to_vec(&Response { id, result })?;
426                    tx.try_send(response)?;
427                }
428                Ok(())
429            }),
430        );
431        assert!(
432            prev_handler.is_none(),
433            "registered multiple handlers for the same LSP method"
434        );
435        Subscription {
436            method,
437            notification_handlers: self.notification_handlers.clone(),
438        }
439    }
440
441    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
442        &self.name
443    }
444
445    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
446        &self.capabilities
447    }
448
449    pub fn request<T: request::Request>(
450        self: &Arc<Self>,
451        params: T::Params,
452    ) -> impl Future<Output = Result<T::Result>>
453    where
454        T::Result: 'static + Send,
455    {
456        Self::request_internal::<T>(
457            &self.next_id,
458            &self.response_handlers,
459            &self.outbound_tx,
460            params,
461        )
462    }
463
464    fn request_internal<T: request::Request>(
465        next_id: &AtomicUsize,
466        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
467        outbound_tx: &channel::Sender<Vec<u8>>,
468        params: T::Params,
469    ) -> impl 'static + Future<Output = Result<T::Result>>
470    where
471        T::Result: 'static + Send,
472    {
473        let id = next_id.fetch_add(1, SeqCst);
474        let message = serde_json::to_vec(&Request {
475            jsonrpc: JSON_RPC_VERSION,
476            id,
477            method: T::METHOD,
478            params,
479        })
480        .unwrap();
481
482        let send = outbound_tx
483            .try_send(message)
484            .context("failed to write to language server's stdin");
485
486        let (tx, rx) = oneshot::channel();
487        response_handlers.lock().insert(
488            id,
489            Box::new(move |result| {
490                let response = match result {
491                    Ok(response) => {
492                        serde_json::from_str(response).context("failed to deserialize response")
493                    }
494                    Err(error) => Err(anyhow!("{}", error.message)),
495                };
496                let _ = tx.send(response);
497            }),
498        );
499
500        async move {
501            send?;
502            rx.await?
503        }
504    }
505
506    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
507        Self::notify_internal::<T>(&self.outbound_tx, params)
508    }
509
510    fn notify_internal<T: notification::Notification>(
511        outbound_tx: &channel::Sender<Vec<u8>>,
512        params: T::Params,
513    ) -> Result<()> {
514        let message = serde_json::to_vec(&Notification {
515            jsonrpc: JSON_RPC_VERSION,
516            method: T::METHOD,
517            params,
518        })
519        .unwrap();
520        outbound_tx.try_send(message)?;
521        Ok(())
522    }
523}
524
525impl Drop for LanguageServer {
526    fn drop(&mut self) {
527        if let Some(shutdown) = self.shutdown() {
528            self.executor.spawn(shutdown).detach();
529        }
530    }
531}
532
533impl Subscription {
534    pub fn detach(mut self) {
535        self.method = "";
536    }
537}
538
539impl Drop for Subscription {
540    fn drop(&mut self) {
541        self.notification_handlers.write().remove(self.method);
542    }
543}
544
545#[cfg(any(test, feature = "test-support"))]
546pub struct FakeLanguageServer {
547    handlers: FakeLanguageServerHandlers,
548    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
549    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
550    _input_task: Task<Result<()>>,
551    _output_task: Task<Result<()>>,
552}
553
554#[cfg(any(test, feature = "test-support"))]
555type FakeLanguageServerHandlers = Arc<
556    Mutex<
557        HashMap<
558            &'static str,
559            Box<
560                dyn Send
561                    + FnMut(
562                        usize,
563                        &[u8],
564                        gpui::AsyncAppContext,
565                    ) -> futures::future::BoxFuture<'static, Vec<u8>>,
566            >,
567        >,
568    >,
569>;
570
571#[cfg(any(test, feature = "test-support"))]
572impl LanguageServer {
573    pub fn full_capabilities() -> ServerCapabilities {
574        ServerCapabilities {
575            document_highlight_provider: Some(OneOf::Left(true)),
576            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
577            document_formatting_provider: Some(OneOf::Left(true)),
578            document_range_formatting_provider: Some(OneOf::Left(true)),
579            ..Default::default()
580        }
581    }
582
583    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
584        Self::fake_with_capabilities(Self::full_capabilities(), cx)
585    }
586
587    pub fn fake_with_capabilities(
588        capabilities: ServerCapabilities,
589        cx: &mut gpui::MutableAppContext,
590    ) -> (Self, FakeLanguageServer) {
591        let (stdin_writer, stdin_reader) = async_pipe::pipe();
592        let (stdout_writer, stdout_reader) = async_pipe::pipe();
593
594        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
595        fake.handle_request::<request::Initialize, _, _>({
596            let capabilities = capabilities.clone();
597            move |_, _| {
598                let capabilities = capabilities.clone();
599                async move {
600                    InitializeResult {
601                        capabilities,
602                        ..Default::default()
603                    }
604                }
605            }
606        });
607
608        let executor = cx.background().clone();
609        let server =
610            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor);
611        (server, fake)
612    }
613}
614
615#[cfg(any(test, feature = "test-support"))]
616impl FakeLanguageServer {
617    fn new(
618        stdin: async_pipe::PipeReader,
619        stdout: async_pipe::PipeWriter,
620        cx: &mut gpui::MutableAppContext,
621    ) -> Self {
622        use futures::StreamExt as _;
623
624        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
625        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
626        let handlers = FakeLanguageServerHandlers::default();
627
628        let input_task = cx.spawn(|cx| {
629            let handlers = handlers.clone();
630            let outgoing_tx = outgoing_tx.clone();
631            async move {
632                let mut buffer = Vec::new();
633                let mut stdin = smol::io::BufReader::new(stdin);
634                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
635                    cx.background().simulate_random_delay().await;
636
637                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
638                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
639
640                        let response;
641                        if let Some(handler) = handlers.lock().get_mut(request.method) {
642                            response =
643                                handler(request.id, request.params.get().as_bytes(), cx.clone())
644                                    .await;
645                            log::debug!("handled lsp request. method:{}", request.method);
646                        } else {
647                            response = serde_json::to_vec(&AnyResponse {
648                                id: request.id,
649                                error: Some(Error {
650                                    message: "no handler".to_string(),
651                                }),
652                                result: None,
653                            })
654                            .unwrap();
655                            log::debug!("unhandled lsp request. method:{}", request.method);
656                        }
657                        outgoing_tx.unbounded_send(response)?;
658                    } else {
659                        incoming_tx.unbounded_send(buffer.clone())?;
660                    }
661                }
662                Ok::<_, anyhow::Error>(())
663            }
664        });
665
666        let output_task = cx.background().spawn(async move {
667            let mut stdout = smol::io::BufWriter::new(stdout);
668            while let Some(message) = outgoing_rx.next().await {
669                stdout
670                    .write_all(CONTENT_LEN_HEADER.as_bytes())
671                    .await
672                    .unwrap();
673                stdout
674                    .write_all((format!("{}", message.len())).as_bytes())
675                    .await
676                    .unwrap();
677                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
678                stdout.write_all(&message).await.unwrap();
679                stdout.flush().await.unwrap();
680            }
681            Ok(())
682        });
683
684        Self {
685            outgoing_tx,
686            incoming_rx,
687            handlers,
688            _input_task: input_task,
689            _output_task: output_task,
690        }
691    }
692
693    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
694        let message = serde_json::to_vec(&Notification {
695            jsonrpc: JSON_RPC_VERSION,
696            method: T::METHOD,
697            params,
698        })
699        .unwrap();
700        self.outgoing_tx.unbounded_send(message).unwrap();
701    }
702
703    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
704        use futures::StreamExt as _;
705
706        loop {
707            let bytes = self.incoming_rx.next().await.unwrap();
708            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
709                assert_eq!(notification.method, T::METHOD);
710                return notification.params;
711            } else {
712                log::info!(
713                    "skipping message in fake language server {:?}",
714                    std::str::from_utf8(&bytes)
715                );
716            }
717        }
718    }
719
720    pub fn handle_request<T, F, Fut>(
721        &mut self,
722        mut handler: F,
723    ) -> futures::channel::mpsc::UnboundedReceiver<()>
724    where
725        T: 'static + request::Request,
726        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
727        Fut: 'static + Send + Future<Output = T::Result>,
728    {
729        use futures::FutureExt as _;
730
731        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
732        self.handlers.lock().insert(
733            T::METHOD,
734            Box::new(move |id, params, cx| {
735                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
736                let responded_tx = responded_tx.clone();
737                async move {
738                    let result = result.await;
739                    let result = serde_json::to_string(&result).unwrap();
740                    let result = serde_json::from_str::<&RawValue>(&result).unwrap();
741                    let response = AnyResponse {
742                        id,
743                        error: None,
744                        result: Some(result),
745                    };
746                    responded_tx.unbounded_send(()).ok();
747                    serde_json::to_vec(&response).unwrap()
748                }
749                .boxed()
750            }),
751        );
752        responded_rx
753    }
754
755    pub fn remove_request_handler<T>(&mut self)
756    where
757        T: 'static + request::Request,
758    {
759        self.handlers.lock().remove(T::METHOD);
760    }
761
762    pub async fn start_progress(&mut self, token: impl Into<String>) {
763        self.notify::<notification::Progress>(ProgressParams {
764            token: NumberOrString::String(token.into()),
765            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
766        });
767    }
768
769    pub async fn end_progress(&mut self, token: impl Into<String>) {
770        self.notify::<notification::Progress>(ProgressParams {
771            token: NumberOrString::String(token.into()),
772            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
773        });
774    }
775
776    async fn receive(
777        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
778        buffer: &mut Vec<u8>,
779    ) -> Result<()> {
780        buffer.clear();
781        stdin.read_until(b'\n', buffer).await?;
782        stdin.read_until(b'\n', buffer).await?;
783        let message_len: usize = std::str::from_utf8(buffer)
784            .unwrap()
785            .strip_prefix(CONTENT_LEN_HEADER)
786            .ok_or_else(|| anyhow!("invalid content length header"))?
787            .trim_end()
788            .parse()
789            .unwrap();
790        buffer.resize(message_len, 0);
791        stdin.read_exact(buffer).await?;
792        Ok(())
793    }
794}
795
796struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
797
798impl Drop for ClearResponseHandlers {
799    fn drop(&mut self) {
800        self.0.lock().clear();
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use gpui::TestAppContext;
808
809    #[ctor::ctor]
810    fn init_logger() {
811        if std::env::var("RUST_LOG").is_ok() {
812            env_logger::init();
813        }
814    }
815
816    #[gpui::test]
817    async fn test_fake(cx: &mut TestAppContext) {
818        let (mut server, mut fake) = cx.update(LanguageServer::fake);
819
820        let (message_tx, message_rx) = channel::unbounded();
821        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
822        server
823            .on_notification::<notification::ShowMessage, _>(move |params| {
824                message_tx.try_send(params).unwrap()
825            })
826            .detach();
827        server
828            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
829                diagnostics_tx.try_send(params).unwrap()
830            })
831            .detach();
832
833        let server = server.initialize().await.unwrap();
834        server
835            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
836                text_document: TextDocumentItem::new(
837                    Url::from_str("file://a/b").unwrap(),
838                    "rust".to_string(),
839                    0,
840                    "".to_string(),
841                ),
842            })
843            .unwrap();
844        assert_eq!(
845            fake.receive_notification::<notification::DidOpenTextDocument>()
846                .await
847                .text_document
848                .uri
849                .as_str(),
850            "file://a/b"
851        );
852
853        fake.notify::<notification::ShowMessage>(ShowMessageParams {
854            typ: MessageType::ERROR,
855            message: "ok".to_string(),
856        });
857        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
858            uri: Url::from_str("file://b/c").unwrap(),
859            version: Some(5),
860            diagnostics: vec![],
861        });
862        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
863        assert_eq!(
864            diagnostics_rx.recv().await.unwrap().uri.as_str(),
865            "file://b/c"
866        );
867
868        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move {});
869
870        drop(server);
871        fake.receive_notification::<notification::Exit>().await;
872    }
873}