lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use collections::HashMap;
  3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
  4use gpui::{executor, Task};
  5use parking_lot::{Mutex, RwLock};
  6use postage::{barrier, prelude::Stream, watch};
  7use serde::{Deserialize, Serialize};
  8use serde_json::{json, value::RawValue, Value};
  9use smol::{
 10    channel,
 11    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 12    process::Command,
 13};
 14use std::{
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: channel::Sender<Vec<u8>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[cfg(any(test, feature = "test-support"))]
 60#[derive(Deserialize)]
 61struct AnyRequest<'a> {
 62    id: usize,
 63    #[serde(borrow)]
 64    jsonrpc: &'a str,
 65    #[serde(borrow)]
 66    method: &'a str,
 67    #[serde(borrow)]
 68    params: &'a RawValue,
 69}
 70
 71#[derive(Serialize, Deserialize)]
 72struct AnyResponse<'a> {
 73    id: usize,
 74    #[serde(default)]
 75    error: Option<Error>,
 76    #[serde(borrow)]
 77    result: Option<&'a RawValue>,
 78}
 79
 80#[derive(Serialize, Deserialize)]
 81struct Notification<'a, T> {
 82    #[serde(borrow)]
 83    jsonrpc: &'a str,
 84    #[serde(borrow)]
 85    method: &'a str,
 86    params: T,
 87}
 88
 89#[derive(Deserialize)]
 90struct AnyNotification<'a> {
 91    #[serde(borrow)]
 92    method: &'a str,
 93    #[serde(borrow)]
 94    params: &'a RawValue,
 95}
 96
 97#[derive(Debug, Serialize, Deserialize)]
 98struct Error {
 99    message: String,
100}
101
102impl LanguageServer {
103    pub fn new(
104        binary_path: &Path,
105        args: &[&str],
106        options: Option<Value>,
107        root_path: &Path,
108        background: Arc<executor::Background>,
109    ) -> Result<Arc<Self>> {
110        let mut server = Command::new(binary_path)
111            .current_dir(root_path)
112            .args(args)
113            .stdin(Stdio::piped())
114            .stdout(Stdio::piped())
115            .stderr(Stdio::inherit())
116            .spawn()?;
117        let stdin = server.stdin.take().unwrap();
118        let stdout = server.stdout.take().unwrap();
119        Self::new_internal(stdin, stdout, root_path, options, background)
120    }
121
122    fn new_internal<Stdin, Stdout>(
123        stdin: Stdin,
124        stdout: Stdout,
125        root_path: &Path,
126        options: Option<Value>,
127        executor: Arc<executor::Background>,
128    ) -> Result<Arc<Self>>
129    where
130        Stdin: AsyncWrite + Unpin + Send + 'static,
131        Stdout: AsyncRead + Unpin + Send + 'static,
132    {
133        let mut stdin = BufWriter::new(stdin);
134        let mut stdout = BufReader::new(stdout);
135        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
136        let notification_handlers =
137            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
138        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
139        let input_task = executor.spawn(
140            {
141                let notification_handlers = notification_handlers.clone();
142                let response_handlers = response_handlers.clone();
143                async move {
144                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
145                    let mut buffer = Vec::new();
146                    loop {
147                        buffer.clear();
148                        stdout.read_until(b'\n', &mut buffer).await?;
149                        stdout.read_until(b'\n', &mut buffer).await?;
150                        let message_len: usize = std::str::from_utf8(&buffer)?
151                            .strip_prefix(CONTENT_LEN_HEADER)
152                            .ok_or_else(|| anyhow!("invalid header"))?
153                            .trim_end()
154                            .parse()?;
155
156                        buffer.resize(message_len, 0);
157                        stdout.read_exact(&mut buffer).await?;
158
159                        if let Ok(AnyNotification { method, params }) =
160                            serde_json::from_slice(&buffer)
161                        {
162                            if let Some(handler) = notification_handlers.write().get_mut(method) {
163                                handler(params.get());
164                            } else {
165                                log::info!(
166                                    "unhandled notification {}:\n{}",
167                                    method,
168                                    serde_json::to_string_pretty(
169                                        &Value::from_str(params.get()).unwrap()
170                                    )
171                                    .unwrap()
172                                );
173                            }
174                        } else if let Ok(AnyResponse { id, error, result }) =
175                            serde_json::from_slice(&buffer)
176                        {
177                            if let Some(handler) = response_handlers.lock().remove(&id) {
178                                if let Some(error) = error {
179                                    handler(Err(error));
180                                } else if let Some(result) = result {
181                                    handler(Ok(result.get()));
182                                } else {
183                                    handler(Ok("null"));
184                                }
185                            }
186                        } else {
187                            return Err(anyhow!(
188                                "failed to deserialize message:\n{}",
189                                std::str::from_utf8(&buffer)?
190                            ));
191                        }
192                    }
193                }
194            }
195            .log_err(),
196        );
197        let (output_done_tx, output_done_rx) = barrier::channel();
198        let output_task = executor.spawn({
199            let response_handlers = response_handlers.clone();
200            async move {
201                let _clear_response_handlers = ClearResponseHandlers(response_handlers);
202                let mut content_len_buffer = Vec::new();
203                while let Ok(message) = outbound_rx.recv().await {
204                    content_len_buffer.clear();
205                    write!(content_len_buffer, "{}", message.len()).unwrap();
206                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
207                    stdin.write_all(&content_len_buffer).await?;
208                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
209                    stdin.write_all(&message).await?;
210                    stdin.flush().await?;
211                }
212                drop(output_done_tx);
213                Ok(())
214            }
215            .log_err()
216        });
217
218        let (initialized_tx, initialized_rx) = barrier::channel();
219        let (mut capabilities_tx, capabilities_rx) = watch::channel();
220        let this = Arc::new(Self {
221            notification_handlers,
222            response_handlers,
223            capabilities: capabilities_rx,
224            next_id: Default::default(),
225            outbound_tx,
226            executor: executor.clone(),
227            io_tasks: Mutex::new(Some((input_task, output_task))),
228            initialized: initialized_rx,
229            output_done_rx: Mutex::new(Some(output_done_rx)),
230        });
231
232        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
233        executor
234            .spawn({
235                let this = this.clone();
236                async move {
237                    if let Some(capabilities) = this.init(root_uri, options).log_err().await {
238                        *capabilities_tx.borrow_mut() = Some(capabilities);
239                    }
240
241                    drop(initialized_tx);
242                }
243            })
244            .detach();
245
246        Ok(this)
247    }
248
249    async fn init(
250        self: Arc<Self>,
251        root_uri: Url,
252        options: Option<Value>,
253    ) -> Result<ServerCapabilities> {
254        #[allow(deprecated)]
255        let params = InitializeParams {
256            process_id: Default::default(),
257            root_path: Default::default(),
258            root_uri: Some(root_uri),
259            initialization_options: options,
260            capabilities: ClientCapabilities {
261                text_document: Some(TextDocumentClientCapabilities {
262                    definition: Some(GotoCapability {
263                        link_support: Some(true),
264                        ..Default::default()
265                    }),
266                    code_action: Some(CodeActionClientCapabilities {
267                        code_action_literal_support: Some(CodeActionLiteralSupport {
268                            code_action_kind: CodeActionKindLiteralSupport {
269                                value_set: vec![
270                                    CodeActionKind::REFACTOR.as_str().into(),
271                                    CodeActionKind::QUICKFIX.as_str().into(),
272                                ],
273                            },
274                        }),
275                        data_support: Some(true),
276                        resolve_support: Some(CodeActionCapabilityResolveSupport {
277                            properties: vec!["edit".to_string()],
278                        }),
279                        ..Default::default()
280                    }),
281                    completion: Some(CompletionClientCapabilities {
282                        completion_item: Some(CompletionItemCapability {
283                            snippet_support: Some(true),
284                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
285                                properties: vec!["additionalTextEdits".to_string()],
286                            }),
287                            ..Default::default()
288                        }),
289                        ..Default::default()
290                    }),
291                    ..Default::default()
292                }),
293                experimental: Some(json!({
294                    "serverStatusNotification": true,
295                })),
296                window: Some(WindowClientCapabilities {
297                    work_done_progress: Some(true),
298                    ..Default::default()
299                }),
300                ..Default::default()
301            },
302            trace: Default::default(),
303            workspace_folders: Default::default(),
304            client_info: Default::default(),
305            locale: Default::default(),
306        };
307
308        let this = self.clone();
309        let request = Self::request_internal::<request::Initialize>(
310            &this.next_id,
311            &this.response_handlers,
312            &this.outbound_tx,
313            params,
314        );
315        let response = request.await?;
316        Self::notify_internal::<notification::Initialized>(
317            &this.outbound_tx,
318            InitializedParams {},
319        )?;
320        Ok(response.capabilities)
321    }
322
323    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
324        if let Some(tasks) = self.io_tasks.lock().take() {
325            let response_handlers = self.response_handlers.clone();
326            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
327            let outbound_tx = self.outbound_tx.clone();
328            let mut output_done = self.output_done_rx.lock().take().unwrap();
329            let shutdown_request = Self::request_internal::<request::Shutdown>(
330                &next_id,
331                &response_handlers,
332                &outbound_tx,
333                (),
334            );
335            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
336            outbound_tx.close();
337            Some(
338                async move {
339                    log::debug!("language server shutdown started");
340                    shutdown_request.await?;
341                    response_handlers.lock().clear();
342                    exit?;
343                    output_done.recv().await;
344                    log::debug!("language server shutdown finished");
345                    drop(tasks);
346                    Ok(())
347                }
348                .log_err(),
349            )
350        } else {
351            None
352        }
353    }
354
355    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
356    where
357        T: notification::Notification,
358        F: 'static + Send + Sync + FnMut(T::Params),
359    {
360        let prev_handler = self.notification_handlers.write().insert(
361            T::METHOD,
362            Box::new(
363                move |notification| match serde_json::from_str(notification) {
364                    Ok(notification) => f(notification),
365                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
366                },
367            ),
368        );
369
370        assert!(
371            prev_handler.is_none(),
372            "registered multiple handlers for the same notification"
373        );
374
375        Subscription {
376            method: T::METHOD,
377            notification_handlers: self.notification_handlers.clone(),
378        }
379    }
380
381    pub fn capabilities(&self) -> impl 'static + Future<Output = Option<ServerCapabilities>> {
382        let mut rx = self.capabilities.clone();
383        async move {
384            loop {
385                let value = rx.recv().await?;
386                if value.is_some() {
387                    return value;
388                }
389            }
390        }
391    }
392
393    pub fn request<T: request::Request>(
394        self: &Arc<Self>,
395        params: T::Params,
396    ) -> impl Future<Output = Result<T::Result>>
397    where
398        T::Result: 'static + Send,
399    {
400        let this = self.clone();
401        async move {
402            this.initialized.clone().recv().await;
403            Self::request_internal::<T>(
404                &this.next_id,
405                &this.response_handlers,
406                &this.outbound_tx,
407                params,
408            )
409            .await
410        }
411    }
412
413    fn request_internal<T: request::Request>(
414        next_id: &AtomicUsize,
415        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
416        outbound_tx: &channel::Sender<Vec<u8>>,
417        params: T::Params,
418    ) -> impl 'static + Future<Output = Result<T::Result>>
419    where
420        T::Result: 'static + Send,
421    {
422        let id = next_id.fetch_add(1, SeqCst);
423        let message = serde_json::to_vec(&Request {
424            jsonrpc: JSON_RPC_VERSION,
425            id,
426            method: T::METHOD,
427            params,
428        })
429        .unwrap();
430
431        let send = outbound_tx
432            .try_send(message)
433            .context("failed to write to language server's stdin");
434
435        let (tx, rx) = oneshot::channel();
436        response_handlers.lock().insert(
437            id,
438            Box::new(move |result| {
439                let response = match result {
440                    Ok(response) => {
441                        serde_json::from_str(response).context("failed to deserialize response")
442                    }
443                    Err(error) => Err(anyhow!("{}", error.message)),
444                };
445                let _ = tx.send(response);
446            }),
447        );
448
449        async move {
450            send?;
451            rx.await?
452        }
453    }
454
455    pub fn notify<T: notification::Notification>(
456        self: &Arc<Self>,
457        params: T::Params,
458    ) -> impl Future<Output = Result<()>> {
459        let this = self.clone();
460        async move {
461            this.initialized.clone().recv().await;
462            Self::notify_internal::<T>(&this.outbound_tx, params)?;
463            Ok(())
464        }
465    }
466
467    fn notify_internal<T: notification::Notification>(
468        outbound_tx: &channel::Sender<Vec<u8>>,
469        params: T::Params,
470    ) -> Result<()> {
471        let message = serde_json::to_vec(&Notification {
472            jsonrpc: JSON_RPC_VERSION,
473            method: T::METHOD,
474            params,
475        })
476        .unwrap();
477        outbound_tx.try_send(message)?;
478        Ok(())
479    }
480}
481
482impl Drop for LanguageServer {
483    fn drop(&mut self) {
484        if let Some(shutdown) = self.shutdown() {
485            self.executor.spawn(shutdown).detach();
486        }
487    }
488}
489
490impl Subscription {
491    pub fn detach(mut self) {
492        self.method = "";
493    }
494}
495
496impl Drop for Subscription {
497    fn drop(&mut self) {
498        self.notification_handlers.write().remove(self.method);
499    }
500}
501
502#[cfg(any(test, feature = "test-support"))]
503pub struct FakeLanguageServer {
504    handlers: FakeLanguageServerHandlers,
505    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
506    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
507    _input_task: Task<Result<()>>,
508    _output_task: Task<Result<()>>,
509}
510
511#[cfg(any(test, feature = "test-support"))]
512type FakeLanguageServerHandlers = Arc<
513    Mutex<
514        HashMap<
515            &'static str,
516            Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
517        >,
518    >,
519>;
520
521#[cfg(any(test, feature = "test-support"))]
522impl LanguageServer {
523    pub fn full_capabilities() -> ServerCapabilities {
524        ServerCapabilities {
525            document_highlight_provider: Some(OneOf::Left(true)),
526            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
527            document_formatting_provider: Some(OneOf::Left(true)),
528            document_range_formatting_provider: Some(OneOf::Left(true)),
529            ..Default::default()
530        }
531    }
532
533    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
534        Self::fake_with_capabilities(Self::full_capabilities(), cx)
535    }
536
537    pub fn fake_with_capabilities(
538        capabilities: ServerCapabilities,
539        cx: &mut gpui::MutableAppContext,
540    ) -> (Arc<Self>, FakeLanguageServer) {
541        let (stdin_writer, stdin_reader) = async_pipe::pipe();
542        let (stdout_writer, stdout_reader) = async_pipe::pipe();
543
544        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
545        fake.handle_request::<request::Initialize, _>({
546            let capabilities = capabilities.clone();
547            move |_, _| InitializeResult {
548                capabilities: capabilities.clone(),
549                ..Default::default()
550            }
551        });
552
553        let server = Self::new_internal(
554            stdin_writer,
555            stdout_reader,
556            Path::new("/"),
557            None,
558            cx.background().clone(),
559        )
560        .unwrap();
561
562        (server, fake)
563    }
564}
565
566#[cfg(any(test, feature = "test-support"))]
567impl FakeLanguageServer {
568    fn new(
569        stdin: async_pipe::PipeReader,
570        stdout: async_pipe::PipeWriter,
571        cx: &mut gpui::MutableAppContext,
572    ) -> Self {
573        use futures::StreamExt as _;
574
575        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
576        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
577        let handlers = FakeLanguageServerHandlers::default();
578
579        let input_task = cx.spawn(|cx| {
580            let handlers = handlers.clone();
581            let outgoing_tx = outgoing_tx.clone();
582            async move {
583                let mut buffer = Vec::new();
584                let mut stdin = smol::io::BufReader::new(stdin);
585                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
586                    cx.background().simulate_random_delay().await;
587                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
588                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
589
590                        let response;
591                        if let Some(handler) = handlers.lock().get_mut(request.method) {
592                            response =
593                                handler(request.id, request.params.get().as_bytes(), cx.clone());
594                            log::debug!("handled lsp request. method:{}", request.method);
595                        } else {
596                            response = serde_json::to_vec(&AnyResponse {
597                                id: request.id,
598                                error: Some(Error {
599                                    message: "no handler".to_string(),
600                                }),
601                                result: None,
602                            })
603                            .unwrap();
604                            log::debug!("unhandled lsp request. method:{}", request.method);
605                        }
606                        outgoing_tx.unbounded_send(response)?;
607                    } else {
608                        incoming_tx.unbounded_send(buffer.clone())?;
609                    }
610                }
611                Ok::<_, anyhow::Error>(())
612            }
613        });
614
615        let output_task = cx.background().spawn(async move {
616            let mut stdout = smol::io::BufWriter::new(stdout);
617            while let Some(message) = outgoing_rx.next().await {
618                stdout
619                    .write_all(CONTENT_LEN_HEADER.as_bytes())
620                    .await
621                    .unwrap();
622                stdout
623                    .write_all((format!("{}", message.len())).as_bytes())
624                    .await
625                    .unwrap();
626                stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
627                stdout.write_all(&message).await.unwrap();
628                stdout.flush().await.unwrap();
629            }
630            Ok(())
631        });
632
633        Self {
634            outgoing_tx,
635            incoming_rx,
636            handlers,
637            _input_task: input_task,
638            _output_task: output_task,
639        }
640    }
641
642    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
643        let message = serde_json::to_vec(&Notification {
644            jsonrpc: JSON_RPC_VERSION,
645            method: T::METHOD,
646            params,
647        })
648        .unwrap();
649        self.outgoing_tx.unbounded_send(message).unwrap();
650    }
651
652    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
653        use futures::StreamExt as _;
654
655        loop {
656            let bytes = self.incoming_rx.next().await.unwrap();
657            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
658                assert_eq!(notification.method, T::METHOD);
659                return notification.params;
660            } else {
661                log::info!(
662                    "skipping message in fake language server {:?}",
663                    std::str::from_utf8(&bytes)
664                );
665            }
666        }
667    }
668
669    pub fn handle_request<T, F>(
670        &mut self,
671        mut handler: F,
672    ) -> futures::channel::mpsc::UnboundedReceiver<()>
673    where
674        T: 'static + request::Request,
675        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
676    {
677        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
678        self.handlers.lock().insert(
679            T::METHOD,
680            Box::new(move |id, params, cx| {
681                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
682                let result = serde_json::to_string(&result).unwrap();
683                let result = serde_json::from_str::<&RawValue>(&result).unwrap();
684                let response = AnyResponse {
685                    id,
686                    error: None,
687                    result: Some(result),
688                };
689                responded_tx.unbounded_send(()).ok();
690                serde_json::to_vec(&response).unwrap()
691            }),
692        );
693        responded_rx
694    }
695
696    pub fn remove_request_handler<T>(&mut self)
697    where
698        T: 'static + request::Request,
699    {
700        self.handlers.lock().remove(T::METHOD);
701    }
702
703    pub async fn start_progress(&mut self, token: impl Into<String>) {
704        self.notify::<notification::Progress>(ProgressParams {
705            token: NumberOrString::String(token.into()),
706            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
707        })
708        .await;
709    }
710
711    pub async fn end_progress(&mut self, token: impl Into<String>) {
712        self.notify::<notification::Progress>(ProgressParams {
713            token: NumberOrString::String(token.into()),
714            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
715        })
716        .await;
717    }
718
719    async fn receive(
720        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
721        buffer: &mut Vec<u8>,
722    ) -> Result<()> {
723        buffer.clear();
724        stdin.read_until(b'\n', buffer).await?;
725        stdin.read_until(b'\n', buffer).await?;
726        let message_len: usize = std::str::from_utf8(buffer)
727            .unwrap()
728            .strip_prefix(CONTENT_LEN_HEADER)
729            .ok_or_else(|| anyhow!("invalid content length header"))?
730            .trim_end()
731            .parse()
732            .unwrap();
733        buffer.resize(message_len, 0);
734        stdin.read_exact(buffer).await?;
735        Ok(())
736    }
737}
738
739struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
740
741impl Drop for ClearResponseHandlers {
742    fn drop(&mut self) {
743        self.0.lock().clear();
744    }
745}
746
747#[cfg(test)]
748mod tests {
749    use super::*;
750    use gpui::TestAppContext;
751
752    #[ctor::ctor]
753    fn init_logger() {
754        if std::env::var("RUST_LOG").is_ok() {
755            env_logger::init();
756        }
757    }
758
759    #[gpui::test]
760    async fn test_fake(cx: &mut TestAppContext) {
761        let (server, mut fake) = cx.update(LanguageServer::fake);
762
763        let (message_tx, message_rx) = channel::unbounded();
764        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
765        server
766            .on_notification::<notification::ShowMessage, _>(move |params| {
767                message_tx.try_send(params).unwrap()
768            })
769            .detach();
770        server
771            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
772                diagnostics_tx.try_send(params).unwrap()
773            })
774            .detach();
775
776        server
777            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
778                text_document: TextDocumentItem::new(
779                    Url::from_str("file://a/b").unwrap(),
780                    "rust".to_string(),
781                    0,
782                    "".to_string(),
783                ),
784            })
785            .await
786            .unwrap();
787        assert_eq!(
788            fake.receive_notification::<notification::DidOpenTextDocument>()
789                .await
790                .text_document
791                .uri
792                .as_str(),
793            "file://a/b"
794        );
795
796        fake.notify::<notification::ShowMessage>(ShowMessageParams {
797            typ: MessageType::ERROR,
798            message: "ok".to_string(),
799        })
800        .await;
801        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
802            uri: Url::from_str("file://b/c").unwrap(),
803            version: Some(5),
804            diagnostics: vec![],
805        })
806        .await;
807        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
808        assert_eq!(
809            diagnostics_rx.recv().await.unwrap().uri.as_str(),
810            "file://b/c"
811        );
812
813        fake.handle_request::<request::Shutdown, _>(|_, _| ());
814
815        drop(server);
816        fake.receive_notification::<notification::Exit>().await;
817    }
818
819    pub enum ServerStatusNotification {}
820
821    impl notification::Notification for ServerStatusNotification {
822        type Params = ServerStatusParams;
823        const METHOD: &'static str = "experimental/serverStatus";
824    }
825
826    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
827    pub struct ServerStatusParams {
828        pub quiescent: bool,
829    }
830}