lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream};
  7use serde::{de::DeserializeOwned, Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    path::PathBuf,
 18    str::FromStr,
 19    sync::{
 20        atomic::{AtomicUsize, Ordering::SeqCst},
 21        Arc,
 22    },
 23};
 24use std::{path::Path, process::Stdio};
 25use util::TryFutureExt;
 26
 27pub use lsp_types::*;
 28
 29const JSON_RPC_VERSION: &'static str = "2.0";
 30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 31
 32type NotificationHandler =
 33    Box<dyn Send + Sync + FnMut(Option<usize>, &str, &mut channel::Sender<Vec<u8>>) -> Result<()>>;
 34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 35
 36pub struct LanguageServer {
 37    next_id: AtomicUsize,
 38    outbound_tx: channel::Sender<Vec<u8>>,
 39    name: String,
 40    capabilities: ServerCapabilities,
 41    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 42    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 43    executor: Arc<executor::Background>,
 44    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 45    output_done_rx: Mutex<Option<barrier::Receiver>>,
 46    root_path: PathBuf,
 47    options: Option<Value>,
 48}
 49
 50pub struct Subscription {
 51    method: &'static str,
 52    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 53}
 54
 55#[derive(Serialize, Deserialize)]
 56struct Request<'a, T> {
 57    jsonrpc: &'a str,
 58    id: usize,
 59    method: &'a str,
 60    params: T,
 61}
 62
 63#[cfg(any(test, feature = "test-support"))]
 64#[derive(Deserialize)]
 65struct AnyRequest<'a> {
 66    id: usize,
 67    #[serde(borrow)]
 68    jsonrpc: &'a str,
 69    #[serde(borrow)]
 70    method: &'a str,
 71    #[serde(borrow)]
 72    params: &'a RawValue,
 73}
 74
 75#[derive(Serialize, Deserialize)]
 76struct AnyResponse<'a> {
 77    id: usize,
 78    #[serde(default)]
 79    error: Option<Error>,
 80    #[serde(borrow)]
 81    result: Option<&'a RawValue>,
 82}
 83
 84#[derive(Serialize)]
 85struct Response<T> {
 86    id: usize,
 87    result: T,
 88}
 89
 90#[derive(Serialize, Deserialize)]
 91struct Notification<'a, T> {
 92    #[serde(borrow)]
 93    jsonrpc: &'a str,
 94    #[serde(borrow)]
 95    method: &'a str,
 96    params: T,
 97}
 98
 99#[derive(Deserialize)]
100struct AnyNotification<'a> {
101    #[serde(default)]
102    id: Option<usize>,
103    #[serde(borrow)]
104    method: &'a str,
105    #[serde(borrow)]
106    params: &'a RawValue,
107}
108
109#[derive(Debug, Serialize, Deserialize)]
110struct Error {
111    message: String,
112}
113
114impl LanguageServer {
115    pub fn new(
116        binary_path: &Path,
117        args: &[&str],
118        root_path: &Path,
119        options: Option<Value>,
120        background: Arc<executor::Background>,
121    ) -> Result<Self> {
122        let working_dir = if root_path.is_dir() {
123            root_path
124        } else {
125            root_path.parent().unwrap_or(Path::new("/"))
126        };
127        let mut server = Command::new(binary_path)
128            .current_dir(working_dir)
129            .args(args)
130            .stdin(Stdio::piped())
131            .stdout(Stdio::piped())
132            .stderr(Stdio::inherit())
133            .spawn()?;
134        let stdin = server.stdin.take().unwrap();
135        let stdout = server.stdout.take().unwrap();
136        let mut server = Self::new_internal(stdin, stdout, root_path, options, background);
137        if let Some(name) = binary_path.file_name() {
138            server.name = name.to_string_lossy().to_string();
139        }
140        Ok(server)
141    }
142
143    fn new_internal<Stdin, Stdout>(
144        stdin: Stdin,
145        stdout: Stdout,
146        root_path: &Path,
147        options: Option<Value>,
148        executor: Arc<executor::Background>,
149    ) -> Self
150    where
151        Stdin: AsyncWrite + Unpin + Send + 'static,
152        Stdout: AsyncRead + Unpin + Send + 'static,
153    {
154        let mut stdin = BufWriter::new(stdin);
155        let mut stdout = BufReader::new(stdout);
156        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
157        let notification_handlers =
158            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
159        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
160        let input_task = executor.spawn(
161            {
162                let notification_handlers = notification_handlers.clone();
163                let response_handlers = response_handlers.clone();
164                let mut outbound_tx = outbound_tx.clone();
165                async move {
166                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
167                    let mut buffer = Vec::new();
168                    loop {
169                        buffer.clear();
170                        stdout.read_until(b'\n', &mut buffer).await?;
171                        stdout.read_until(b'\n', &mut buffer).await?;
172                        let message_len: usize = std::str::from_utf8(&buffer)?
173                            .strip_prefix(CONTENT_LEN_HEADER)
174                            .ok_or_else(|| anyhow!("invalid header"))?
175                            .trim_end()
176                            .parse()?;
177
178                        buffer.resize(message_len, 0);
179                        stdout.read_exact(&mut buffer).await?;
180
181                        if let Ok(AnyNotification { id, method, params }) =
182                            serde_json::from_slice(&buffer)
183                        {
184                            if let Some(handler) = notification_handlers.write().get_mut(method) {
185                                if let Err(e) = handler(id, params.get(), &mut outbound_tx) {
186                                    log::error!("error handling {} message: {:?}", method, e);
187                                }
188                            } else {
189                                log::info!(
190                                    "unhandled notification {}:\n{}",
191                                    method,
192                                    serde_json::to_string_pretty(
193                                        &Value::from_str(params.get()).unwrap()
194                                    )
195                                    .unwrap()
196                                );
197                            }
198                        } else if let Ok(AnyResponse { id, error, result }) =
199                            serde_json::from_slice(&buffer)
200                        {
201                            if let Some(handler) = response_handlers.lock().remove(&id) {
202                                if let Some(error) = error {
203                                    handler(Err(error));
204                                } else if let Some(result) = result {
205                                    handler(Ok(result.get()));
206                                } else {
207                                    handler(Ok("null"));
208                                }
209                            }
210                        } else {
211                            return Err(anyhow!(
212                                "failed to deserialize message:\n{}",
213                                std::str::from_utf8(&buffer)?
214                            ));
215                        }
216                    }
217                }
218            }
219            .log_err(),
220        );
221        let (output_done_tx, output_done_rx) = barrier::channel();
222        let output_task = executor.spawn({
223            let response_handlers = response_handlers.clone();
224            async move {
225                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
226                let mut content_len_buffer = Vec::new();
227                while let Ok(message) = outbound_rx.recv().await {
228                    content_len_buffer.clear();
229                    write!(content_len_buffer, "{}", message.len()).unwrap();
230                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
231                    stdin.write_all(&content_len_buffer).await?;
232                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
233                    stdin.write_all(&message).await?;
234                    stdin.flush().await?;
235                }
236                drop(output_done_tx);
237                Ok(())
238            }
239            .log_err()
240        });
241
242        Self {
243            notification_handlers,
244            response_handlers,
245            name: Default::default(),
246            capabilities: Default::default(),
247            next_id: Default::default(),
248            outbound_tx,
249            executor: executor.clone(),
250            io_tasks: Mutex::new(Some((input_task, output_task))),
251            output_done_rx: Mutex::new(Some(output_done_rx)),
252            root_path: root_path.to_path_buf(),
253            options,
254        }
255    }
256
257    pub async fn initialize(mut self) -> Result<Arc<Self>> {
258        let options = self.options.take();
259        let mut this = Arc::new(self);
260        let root_uri = Url::from_file_path(&this.root_path).unwrap();
261        #[allow(deprecated)]
262        let params = InitializeParams {
263            process_id: Default::default(),
264            root_path: Default::default(),
265            root_uri: Some(root_uri),
266            initialization_options: options,
267            capabilities: ClientCapabilities {
268                workspace: Some(WorkspaceClientCapabilities {
269                    configuration: Some(true),
270                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
271                        dynamic_registration: Some(true),
272                    }),
273                    ..Default::default()
274                }),
275                text_document: Some(TextDocumentClientCapabilities {
276                    definition: Some(GotoCapability {
277                        link_support: Some(true),
278                        ..Default::default()
279                    }),
280                    code_action: Some(CodeActionClientCapabilities {
281                        code_action_literal_support: Some(CodeActionLiteralSupport {
282                            code_action_kind: CodeActionKindLiteralSupport {
283                                value_set: vec![
284                                    CodeActionKind::REFACTOR.as_str().into(),
285                                    CodeActionKind::QUICKFIX.as_str().into(),
286                                ],
287                            },
288                        }),
289                        data_support: Some(true),
290                        resolve_support: Some(CodeActionCapabilityResolveSupport {
291                            properties: vec!["edit".to_string()],
292                        }),
293                        ..Default::default()
294                    }),
295                    completion: Some(CompletionClientCapabilities {
296                        completion_item: Some(CompletionItemCapability {
297                            snippet_support: Some(true),
298                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
299                                properties: vec!["additionalTextEdits".to_string()],
300                            }),
301                            ..Default::default()
302                        }),
303                        ..Default::default()
304                    }),
305                    ..Default::default()
306                }),
307                experimental: Some(json!({
308                    "serverStatusNotification": true,
309                })),
310                window: Some(WindowClientCapabilities {
311                    work_done_progress: Some(true),
312                    ..Default::default()
313                }),
314                ..Default::default()
315            },
316            trace: Default::default(),
317            workspace_folders: Default::default(),
318            client_info: Default::default(),
319            locale: Default::default(),
320        };
321
322        let response = this.request::<request::Initialize>(params).await?;
323        {
324            let this = Arc::get_mut(&mut this).unwrap();
325            if let Some(info) = response.server_info {
326                this.name = info.name;
327            }
328            this.capabilities = response.capabilities;
329        }
330        this.notify::<notification::Initialized>(InitializedParams {})?;
331        Ok(this)
332    }
333
334    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
335        if let Some(tasks) = self.io_tasks.lock().take() {
336            let response_handlers = self.response_handlers.clone();
337            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
338            let outbound_tx = self.outbound_tx.clone();
339            let mut output_done = self.output_done_rx.lock().take().unwrap();
340            let shutdown_request = Self::request_internal::<request::Shutdown>(
341                &next_id,
342                &response_handlers,
343                &outbound_tx,
344                (),
345            );
346            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
347            outbound_tx.close();
348            Some(
349                async move {
350                    log::debug!("language server shutdown started");
351                    shutdown_request.await?;
352                    response_handlers.lock().clear();
353                    exit?;
354                    output_done.recv().await;
355                    log::debug!("language server shutdown finished");
356                    drop(tasks);
357                    Ok(())
358                }
359                .log_err(),
360            )
361        } else {
362            None
363        }
364    }
365
366    pub fn on_notification<T, F>(&mut self, f: F) -> Subscription
367    where
368        T: notification::Notification,
369        F: 'static + Send + Sync + FnMut(T::Params),
370    {
371        self.on_custom_notification(T::METHOD, f)
372    }
373
374    pub fn on_request<T, F>(&mut self, f: F) -> Subscription
375    where
376        T: request::Request,
377        F: 'static + Send + Sync + FnMut(T::Params) -> Result<T::Result>,
378    {
379        self.on_custom_request(T::METHOD, f)
380    }
381
382    pub fn on_custom_notification<Params, F>(
383        &mut self,
384        method: &'static str,
385        mut f: F,
386    ) -> Subscription
387    where
388        F: 'static + Send + Sync + FnMut(Params),
389        Params: DeserializeOwned,
390    {
391        let prev_handler = self.notification_handlers.write().insert(
392            method,
393            Box::new(move |_, params, _| {
394                let params = serde_json::from_str(params)?;
395                f(params);
396                Ok(())
397            }),
398        );
399        assert!(
400            prev_handler.is_none(),
401            "registered multiple handlers for the same LSP method"
402        );
403        Subscription {
404            method,
405            notification_handlers: self.notification_handlers.clone(),
406        }
407    }
408
409    pub fn on_custom_request<Params, Res, F>(
410        &mut self,
411        method: &'static str,
412        mut f: F,
413    ) -> Subscription
414    where
415        F: 'static + Send + Sync + FnMut(Params) -> Result<Res>,
416        Params: DeserializeOwned,
417        Res: Serialize,
418    {
419        let prev_handler = self.notification_handlers.write().insert(
420            method,
421            Box::new(move |id, params, tx| {
422                if let Some(id) = id {
423                    let params = serde_json::from_str(params)?;
424                    let result = f(params)?;
425                    let response = serde_json::to_vec(&Response { id, result })?;
426                    tx.try_send(response)?;
427                }
428                Ok(())
429            }),
430        );
431        assert!(
432            prev_handler.is_none(),
433            "registered multiple handlers for the same LSP method"
434        );
435        Subscription {
436            method,
437            notification_handlers: self.notification_handlers.clone(),
438        }
439    }
440
441    pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
442        &self.name
443    }
444
445    pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
446        &self.capabilities
447    }
448
449    pub fn request<T: request::Request>(
450        self: &Arc<Self>,
451        params: T::Params,
452    ) -> impl Future<Output = Result<T::Result>>
453    where
454        T::Result: 'static + Send,
455    {
456        Self::request_internal::<T>(
457            &self.next_id,
458            &self.response_handlers,
459            &self.outbound_tx,
460            params,
461        )
462    }
463
464    fn request_internal<T: request::Request>(
465        next_id: &AtomicUsize,
466        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
467        outbound_tx: &channel::Sender<Vec<u8>>,
468        params: T::Params,
469    ) -> impl 'static + Future<Output = Result<T::Result>>
470    where
471        T::Result: 'static + Send,
472    {
473        let id = next_id.fetch_add(1, SeqCst);
474        let message = serde_json::to_vec(&Request {
475            jsonrpc: JSON_RPC_VERSION,
476            id,
477            method: T::METHOD,
478            params,
479        })
480        .unwrap();
481
482        let send = outbound_tx
483            .try_send(message)
484            .context("failed to write to language server's stdin");
485
486        let (tx, rx) = oneshot::channel();
487        response_handlers.lock().insert(
488            id,
489            Box::new(move |result| {
490                let response = match result {
491                    Ok(response) => {
492                        serde_json::from_str(response).context("failed to deserialize response")
493                    }
494                    Err(error) => Err(anyhow!("{}", error.message)),
495                };
496                let _ = tx.send(response);
497            }),
498        );
499
500        async move {
501            send?;
502            rx.await?
503        }
504    }
505
506    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
507        Self::notify_internal::<T>(&self.outbound_tx, params)
508    }
509
510    fn notify_internal<T: notification::Notification>(
511        outbound_tx: &channel::Sender<Vec<u8>>,
512        params: T::Params,
513    ) -> Result<()> {
514        let message = serde_json::to_vec(&Notification {
515            jsonrpc: JSON_RPC_VERSION,
516            method: T::METHOD,
517            params,
518        })
519        .unwrap();
520        outbound_tx.try_send(message)?;
521        Ok(())
522    }
523}
524
525impl Drop for LanguageServer {
526    fn drop(&mut self) {
527        if let Some(shutdown) = self.shutdown() {
528            self.executor.spawn(shutdown).detach();
529        }
530    }
531}
532
533impl Subscription {
534    pub fn detach(mut self) {
535        self.method = "";
536    }
537}
538
539impl Drop for Subscription {
540    fn drop(&mut self) {
541        self.notification_handlers.write().remove(self.method);
542    }
543}
544
545#[cfg(any(test, feature = "test-support"))]
546pub struct FakeLanguageServer {
547    handlers: FakeLanguageServerHandlers,
548    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
549    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
550    _input_task: Task<Result<()>>,
551    _output_task: Task<Result<()>>,
552}
553
554#[cfg(any(test, feature = "test-support"))]
555type FakeLanguageServerHandlers = Arc<
556    Mutex<
557        HashMap<
558            &'static str,
559            Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
560        >,
561    >,
562>;
563
564#[cfg(any(test, feature = "test-support"))]
565impl LanguageServer {
566    pub fn full_capabilities() -> ServerCapabilities {
567        ServerCapabilities {
568            document_highlight_provider: Some(OneOf::Left(true)),
569            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
570            document_formatting_provider: Some(OneOf::Left(true)),
571            document_range_formatting_provider: Some(OneOf::Left(true)),
572            ..Default::default()
573        }
574    }
575
576    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
577        Self::fake_with_capabilities(Self::full_capabilities(), cx)
578    }
579
580    pub fn fake_with_capabilities(
581        capabilities: ServerCapabilities,
582        cx: &mut gpui::MutableAppContext,
583    ) -> (Self, FakeLanguageServer) {
584        let (stdin_writer, stdin_reader) = async_pipe::pipe();
585        let (stdout_writer, stdout_reader) = async_pipe::pipe();
586
587        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
588        fake.handle_request::<request::Initialize, _>({
589            let capabilities = capabilities.clone();
590            move |_, _| InitializeResult {
591                capabilities: capabilities.clone(),
592                ..Default::default()
593            }
594        });
595
596        let executor = cx.background().clone();
597        let server =
598            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor);
599        (server, fake)
600    }
601}
602
603#[cfg(any(test, feature = "test-support"))]
604impl FakeLanguageServer {
605    fn new(
606        stdin: async_pipe::PipeReader,
607        stdout: async_pipe::PipeWriter,
608        cx: &mut gpui::MutableAppContext,
609    ) -> Self {
610        use futures::StreamExt as _;
611
612        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
613        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
614        let handlers = FakeLanguageServerHandlers::default();
615
616        let input_task = cx.spawn(|cx| {
617            let handlers = handlers.clone();
618            let outgoing_tx = outgoing_tx.clone();
619            async move {
620                let mut buffer = Vec::new();
621                let mut stdin = smol::io::BufReader::new(stdin);
622                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
623                    cx.background().simulate_random_delay().await;
624
625                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
626                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
627
628                        let response;
629                        if let Some(handler) = handlers.lock().get_mut(request.method) {
630                            response =
631                                handler(request.id, request.params.get().as_bytes(), cx.clone());
632                            log::debug!("handled lsp request. method:{}", request.method);
633                        } else {
634                            response = serde_json::to_vec(&AnyResponse {
635                                id: request.id,
636                                error: Some(Error {
637                                    message: "no handler".to_string(),
638                                }),
639                                result: None,
640                            })
641                            .unwrap();
642                            log::debug!("unhandled lsp request. method:{}", request.method);
643                        }
644                        outgoing_tx.unbounded_send(response)?;
645                    } else {
646                        incoming_tx.unbounded_send(buffer.clone())?;
647                    }
648                }
649                Ok::<_, anyhow::Error>(())
650            }
651        });
652
653        let output_task = cx.background().spawn(async move {
654            let mut stdout = smol::io::BufWriter::new(stdout);
655            while let Some(message) = outgoing_rx.next().await {
656                stdout
657                    .write_all(CONTENT_LEN_HEADER.as_bytes())
658                    .await
659                    .unwrap();
660                stdout
661                    .write_all((format!("{}", message.len())).as_bytes())
662                    .await
663                    .unwrap();
664                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
665                stdout.write_all(&message).await.unwrap();
666                stdout.flush().await.unwrap();
667            }
668            Ok(())
669        });
670
671        Self {
672            outgoing_tx,
673            incoming_rx,
674            handlers,
675            _input_task: input_task,
676            _output_task: output_task,
677        }
678    }
679
680    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
681        let message = serde_json::to_vec(&Notification {
682            jsonrpc: JSON_RPC_VERSION,
683            method: T::METHOD,
684            params,
685        })
686        .unwrap();
687        self.outgoing_tx.unbounded_send(message).unwrap();
688    }
689
690    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
691        use futures::StreamExt as _;
692
693        loop {
694            let bytes = self.incoming_rx.next().await.unwrap();
695            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
696                assert_eq!(notification.method, T::METHOD);
697                return notification.params;
698            } else {
699                log::info!(
700                    "skipping message in fake language server {:?}",
701                    std::str::from_utf8(&bytes)
702                );
703            }
704        }
705    }
706
707    pub fn handle_request<T, F>(
708        &mut self,
709        mut handler: F,
710    ) -> futures::channel::mpsc::UnboundedReceiver<()>
711    where
712        T: 'static + request::Request,
713        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
714    {
715        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
716        self.handlers.lock().insert(
717            T::METHOD,
718            Box::new(move |id, params, cx| {
719                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
720                let result = serde_json::to_string(&result).unwrap();
721                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
722                let response = AnyResponse {
723                    id,
724                    error: None,
725                    result: Some(result),
726                };
727                responded_tx.unbounded_send(()).ok();
728                serde_json::to_vec(&response).unwrap()
729            }),
730        );
731        responded_rx
732    }
733
734    pub fn remove_request_handler<T>(&mut self)
735    where
736        T: 'static + request::Request,
737    {
738        self.handlers.lock().remove(T::METHOD);
739    }
740
741    pub async fn start_progress(&mut self, token: impl Into<String>) {
742        self.notify::<notification::Progress>(ProgressParams {
743            token: NumberOrString::String(token.into()),
744            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
745        });
746    }
747
748    pub async fn end_progress(&mut self, token: impl Into<String>) {
749        self.notify::<notification::Progress>(ProgressParams {
750            token: NumberOrString::String(token.into()),
751            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
752        });
753    }
754
755    async fn receive(
756        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
757        buffer: &mut Vec<u8>,
758    ) -> Result<()> {
759        buffer.clear();
760        stdin.read_until(b'\n', buffer).await?;
761        stdin.read_until(b'\n', buffer).await?;
762        let message_len: usize = std::str::from_utf8(buffer)
763            .unwrap()
764            .strip_prefix(CONTENT_LEN_HEADER)
765            .ok_or_else(|| anyhow!("invalid content length header"))?
766            .trim_end()
767            .parse()
768            .unwrap();
769        buffer.resize(message_len, 0);
770        stdin.read_exact(buffer).await?;
771        Ok(())
772    }
773}
774
775struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
776
777impl Drop for ClearResponseHandlers {
778    fn drop(&mut self) {
779        self.0.lock().clear();
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use super::*;
786    use gpui::TestAppContext;
787
788    #[ctor::ctor]
789    fn init_logger() {
790        if std::env::var("RUST_LOG").is_ok() {
791            env_logger::init();
792        }
793    }
794
795    #[gpui::test]
796    async fn test_fake(cx: &mut TestAppContext) {
797        let (mut server, mut fake) = cx.update(LanguageServer::fake);
798
799        let (message_tx, message_rx) = channel::unbounded();
800        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
801        server
802            .on_notification::<notification::ShowMessage, _>(move |params| {
803                message_tx.try_send(params).unwrap()
804            })
805            .detach();
806        server
807            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
808                diagnostics_tx.try_send(params).unwrap()
809            })
810            .detach();
811
812        let server = server.initialize().await.unwrap();
813        server
814            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
815                text_document: TextDocumentItem::new(
816                    Url::from_str("file://a/b").unwrap(),
817                    "rust".to_string(),
818                    0,
819                    "".to_string(),
820                ),
821            })
822            .unwrap();
823        assert_eq!(
824            fake.receive_notification::<notification::DidOpenTextDocument>()
825                .await
826                .text_document
827                .uri
828                .as_str(),
829            "file://a/b"
830        );
831
832        fake.notify::<notification::ShowMessage>(ShowMessageParams {
833            typ: MessageType::ERROR,
834            message: "ok".to_string(),
835        });
836        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
837            uri: Url::from_str("file://b/c").unwrap(),
838            version: Some(5),
839            diagnostics: vec![],
840        });
841        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
842        assert_eq!(
843            diagnostics_rx.recv().await.unwrap().uri.as_str(),
844            "file://b/c"
845        );
846
847        fake.handle_request::<request::Shutdown, _>(|_, _| ());
848
849        drop(server);
850        fake.receive_notification::<notification::Exit>().await;
851    }
852}