1use anyhow::{anyhow, Context, Result};
2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
3use gpui::{executor, Task};
4use parking_lot::{Mutex, RwLock};
5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, value::RawValue, Value};
8use smol::{
9 channel,
10 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
11 process::Command,
12};
13use std::{
14 collections::HashMap,
15 future::Future,
16 io::Write,
17 str::FromStr,
18 sync::{
19 atomic::{AtomicUsize, Ordering::SeqCst},
20 Arc,
21 },
22};
23use std::{path::Path, process::Stdio};
24use util::TryFutureExt;
25
26pub use lsp_types::*;
27
28const JSON_RPC_VERSION: &'static str = "2.0";
29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
30
31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
33
34pub struct LanguageServer {
35 next_id: AtomicUsize,
36 outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
37 capabilities: watch::Receiver<Option<ServerCapabilities>>,
38 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
39 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
40 executor: Arc<executor::Background>,
41 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
42 initialized: barrier::Receiver,
43 output_done_rx: Mutex<Option<barrier::Receiver>>,
44}
45
46pub struct Subscription {
47 method: &'static str,
48 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
49}
50
51#[derive(Serialize, Deserialize)]
52struct Request<'a, T> {
53 jsonrpc: &'a str,
54 id: usize,
55 method: &'a str,
56 params: T,
57}
58
59#[cfg(any(test, feature = "test-support"))]
60#[derive(Deserialize)]
61struct AnyRequest<'a> {
62 id: usize,
63 #[serde(borrow)]
64 jsonrpc: &'a str,
65 #[serde(borrow)]
66 method: &'a str,
67 #[serde(borrow)]
68 params: &'a RawValue,
69}
70
71#[derive(Serialize, Deserialize)]
72struct AnyResponse<'a> {
73 id: usize,
74 #[serde(default)]
75 error: Option<Error>,
76 #[serde(borrow)]
77 result: Option<&'a RawValue>,
78}
79
80#[derive(Serialize, Deserialize)]
81struct Notification<'a, T> {
82 #[serde(borrow)]
83 jsonrpc: &'a str,
84 #[serde(borrow)]
85 method: &'a str,
86 params: T,
87}
88
89#[derive(Deserialize)]
90struct AnyNotification<'a> {
91 #[serde(borrow)]
92 method: &'a str,
93 #[serde(borrow)]
94 params: &'a RawValue,
95}
96
97#[derive(Debug, Serialize, Deserialize)]
98struct Error {
99 message: String,
100}
101
102impl LanguageServer {
103 pub fn new(
104 binary_path: &Path,
105 root_path: &Path,
106 background: Arc<executor::Background>,
107 ) -> Result<Arc<Self>> {
108 let mut server = Command::new(binary_path)
109 .stdin(Stdio::piped())
110 .stdout(Stdio::piped())
111 .stderr(Stdio::inherit())
112 .spawn()?;
113 let stdin = server.stdin.take().unwrap();
114 let stdout = server.stdout.take().unwrap();
115 Self::new_internal(stdin, stdout, root_path, background)
116 }
117
118 fn new_internal<Stdin, Stdout>(
119 stdin: Stdin,
120 stdout: Stdout,
121 root_path: &Path,
122 executor: Arc<executor::Background>,
123 ) -> Result<Arc<Self>>
124 where
125 Stdin: AsyncWrite + Unpin + Send + 'static,
126 Stdout: AsyncRead + Unpin + Send + 'static,
127 {
128 let mut stdin = BufWriter::new(stdin);
129 let mut stdout = BufReader::new(stdout);
130 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131 let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
132 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
133 let input_task = executor.spawn(
134 {
135 let notification_handlers = notification_handlers.clone();
136 let response_handlers = response_handlers.clone();
137 async move {
138 let mut buffer = Vec::new();
139 loop {
140 buffer.clear();
141 stdout.read_until(b'\n', &mut buffer).await?;
142 stdout.read_until(b'\n', &mut buffer).await?;
143 let message_len: usize = std::str::from_utf8(&buffer)?
144 .strip_prefix(CONTENT_LEN_HEADER)
145 .ok_or_else(|| anyhow!("invalid header"))?
146 .trim_end()
147 .parse()?;
148
149 buffer.resize(message_len, 0);
150 stdout.read_exact(&mut buffer).await?;
151
152 if let Ok(AnyNotification { method, params }) =
153 serde_json::from_slice(&buffer)
154 {
155 if let Some(handler) = notification_handlers.write().get_mut(method) {
156 handler(params.get());
157 } else {
158 log::info!(
159 "unhandled notification {}:\n{}",
160 method,
161 serde_json::to_string_pretty(
162 &Value::from_str(params.get()).unwrap()
163 )
164 .unwrap()
165 );
166 }
167 } else if let Ok(AnyResponse { id, error, result }) =
168 serde_json::from_slice(&buffer)
169 {
170 if let Some(handler) = response_handlers.lock().remove(&id) {
171 if let Some(error) = error {
172 handler(Err(error));
173 } else if let Some(result) = result {
174 handler(Ok(result.get()));
175 } else {
176 handler(Ok("null"));
177 }
178 }
179 } else {
180 return Err(anyhow!(
181 "failed to deserialize message:\n{}",
182 std::str::from_utf8(&buffer)?
183 ));
184 }
185 }
186 }
187 }
188 .log_err(),
189 );
190 let (output_done_tx, output_done_rx) = barrier::channel();
191 let output_task = executor.spawn(
192 async move {
193 let mut content_len_buffer = Vec::new();
194 while let Ok(message) = outbound_rx.recv().await {
195 content_len_buffer.clear();
196 write!(content_len_buffer, "{}", message.len()).unwrap();
197 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
198 stdin.write_all(&content_len_buffer).await?;
199 stdin.write_all("\r\n\r\n".as_bytes()).await?;
200 stdin.write_all(&message).await?;
201 stdin.flush().await?;
202 }
203 drop(output_done_tx);
204 Ok(())
205 }
206 .log_err(),
207 );
208
209 let (initialized_tx, initialized_rx) = barrier::channel();
210 let (mut capabilities_tx, capabilities_rx) = watch::channel();
211 let this = Arc::new(Self {
212 notification_handlers,
213 response_handlers,
214 capabilities: capabilities_rx,
215 next_id: Default::default(),
216 outbound_tx: RwLock::new(Some(outbound_tx)),
217 executor: executor.clone(),
218 io_tasks: Mutex::new(Some((input_task, output_task))),
219 initialized: initialized_rx,
220 output_done_rx: Mutex::new(Some(output_done_rx)),
221 });
222
223 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
224 executor
225 .spawn({
226 let this = this.clone();
227 async move {
228 if let Some(capabilities) = this.init(root_uri).log_err().await {
229 *capabilities_tx.borrow_mut() = Some(capabilities);
230 }
231
232 drop(initialized_tx);
233 }
234 })
235 .detach();
236
237 Ok(this)
238 }
239
240 async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
241 #[allow(deprecated)]
242 let params = InitializeParams {
243 process_id: Default::default(),
244 root_path: Default::default(),
245 root_uri: Some(root_uri),
246 initialization_options: Default::default(),
247 capabilities: ClientCapabilities {
248 text_document: Some(TextDocumentClientCapabilities {
249 definition: Some(GotoCapability {
250 link_support: Some(true),
251 ..Default::default()
252 }),
253 code_action: Some(CodeActionClientCapabilities {
254 code_action_literal_support: Some(CodeActionLiteralSupport {
255 code_action_kind: CodeActionKindLiteralSupport {
256 value_set: vec![
257 CodeActionKind::REFACTOR.as_str().into(),
258 CodeActionKind::QUICKFIX.as_str().into(),
259 ],
260 },
261 }),
262 data_support: Some(true),
263 resolve_support: Some(CodeActionCapabilityResolveSupport {
264 properties: vec!["edit".to_string()],
265 }),
266 ..Default::default()
267 }),
268 completion: Some(CompletionClientCapabilities {
269 completion_item: Some(CompletionItemCapability {
270 snippet_support: Some(true),
271 resolve_support: Some(CompletionItemCapabilityResolveSupport {
272 properties: vec!["additionalTextEdits".to_string()],
273 }),
274 ..Default::default()
275 }),
276 ..Default::default()
277 }),
278 ..Default::default()
279 }),
280 experimental: Some(json!({
281 "serverStatusNotification": true,
282 })),
283 window: Some(WindowClientCapabilities {
284 work_done_progress: Some(true),
285 ..Default::default()
286 }),
287 ..Default::default()
288 },
289 trace: Default::default(),
290 workspace_folders: Default::default(),
291 client_info: Default::default(),
292 locale: Default::default(),
293 };
294
295 let this = self.clone();
296 let request = Self::request_internal::<request::Initialize>(
297 &this.next_id,
298 &this.response_handlers,
299 this.outbound_tx.read().as_ref(),
300 params,
301 );
302 let response = request.await?;
303 Self::notify_internal::<notification::Initialized>(
304 this.outbound_tx.read().as_ref(),
305 InitializedParams {},
306 )?;
307 Ok(response.capabilities)
308 }
309
310 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
311 if let Some(tasks) = self.io_tasks.lock().take() {
312 let response_handlers = self.response_handlers.clone();
313 let outbound_tx = self.outbound_tx.write().take();
314 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
315 let mut output_done = self.output_done_rx.lock().take().unwrap();
316 Some(async move {
317 Self::request_internal::<request::Shutdown>(
318 &next_id,
319 &response_handlers,
320 outbound_tx.as_ref(),
321 (),
322 )
323 .await?;
324 Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
325 drop(outbound_tx);
326 output_done.recv().await;
327 drop(tasks);
328 Ok(())
329 })
330 } else {
331 None
332 }
333 }
334
335 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
336 where
337 T: notification::Notification,
338 F: 'static + Send + Sync + FnMut(T::Params),
339 {
340 let prev_handler = self.notification_handlers.write().insert(
341 T::METHOD,
342 Box::new(
343 move |notification| match serde_json::from_str(notification) {
344 Ok(notification) => f(notification),
345 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
346 },
347 ),
348 );
349
350 assert!(
351 prev_handler.is_none(),
352 "registered multiple handlers for the same notification"
353 );
354
355 Subscription {
356 method: T::METHOD,
357 notification_handlers: self.notification_handlers.clone(),
358 }
359 }
360
361 pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
362 self.capabilities.clone()
363 }
364
365 pub fn request<T: request::Request>(
366 self: &Arc<Self>,
367 params: T::Params,
368 ) -> impl Future<Output = Result<T::Result>>
369 where
370 T::Result: 'static + Send,
371 {
372 let this = self.clone();
373 async move {
374 this.initialized.clone().recv().await;
375 Self::request_internal::<T>(
376 &this.next_id,
377 &this.response_handlers,
378 this.outbound_tx.read().as_ref(),
379 params,
380 )
381 .await
382 }
383 }
384
385 fn request_internal<T: request::Request>(
386 next_id: &AtomicUsize,
387 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
388 outbound_tx: Option<&channel::Sender<Vec<u8>>>,
389 params: T::Params,
390 ) -> impl 'static + Future<Output = Result<T::Result>>
391 where
392 T::Result: 'static + Send,
393 {
394 let id = next_id.fetch_add(1, SeqCst);
395 let message = serde_json::to_vec(&Request {
396 jsonrpc: JSON_RPC_VERSION,
397 id,
398 method: T::METHOD,
399 params,
400 })
401 .unwrap();
402 let mut response_handlers = response_handlers.lock();
403 let (mut tx, mut rx) = oneshot::channel();
404 response_handlers.insert(
405 id,
406 Box::new(move |result| {
407 let response = match result {
408 Ok(response) => {
409 serde_json::from_str(response).context("failed to deserialize response")
410 }
411 Err(error) => Err(anyhow!("{}", error.message)),
412 };
413 let _ = tx.try_send(response);
414 }),
415 );
416
417 let send = outbound_tx
418 .as_ref()
419 .ok_or_else(|| {
420 anyhow!("tried to send a request to a language server that has been shut down")
421 })
422 .and_then(|outbound_tx| {
423 outbound_tx
424 .try_send(message)
425 .context("failed to write to language server's stdin")?;
426 Ok(())
427 });
428 async move {
429 send?;
430 rx.recv().await.unwrap()
431 }
432 }
433
434 pub fn notify<T: notification::Notification>(
435 self: &Arc<Self>,
436 params: T::Params,
437 ) -> impl Future<Output = Result<()>> {
438 let this = self.clone();
439 async move {
440 this.initialized.clone().recv().await;
441 Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
442 Ok(())
443 }
444 }
445
446 fn notify_internal<T: notification::Notification>(
447 outbound_tx: Option<&channel::Sender<Vec<u8>>>,
448 params: T::Params,
449 ) -> Result<()> {
450 let message = serde_json::to_vec(&Notification {
451 jsonrpc: JSON_RPC_VERSION,
452 method: T::METHOD,
453 params,
454 })
455 .unwrap();
456 let outbound_tx = outbound_tx
457 .as_ref()
458 .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
459 outbound_tx.try_send(message)?;
460 Ok(())
461 }
462}
463
464impl Drop for LanguageServer {
465 fn drop(&mut self) {
466 if let Some(shutdown) = self.shutdown() {
467 self.executor.spawn(shutdown).detach();
468 }
469 }
470}
471
472impl Subscription {
473 pub fn detach(mut self) {
474 self.method = "";
475 }
476}
477
478impl Drop for Subscription {
479 fn drop(&mut self) {
480 self.notification_handlers.write().remove(self.method);
481 }
482}
483
484#[cfg(any(test, feature = "test-support"))]
485pub struct FakeLanguageServer {
486 handlers:
487 Arc<Mutex<HashMap<&'static str, Box<dyn Send + Sync + FnMut(usize, &[u8]) -> Vec<u8>>>>>,
488 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
489 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
490}
491
492#[cfg(any(test, feature = "test-support"))]
493impl LanguageServer {
494 pub fn fake(executor: Arc<gpui::executor::Background>) -> (Arc<Self>, FakeLanguageServer) {
495 Self::fake_with_capabilities(Default::default(), executor)
496 }
497
498 pub fn fake_with_capabilities(
499 capabilities: ServerCapabilities,
500 executor: Arc<gpui::executor::Background>,
501 ) -> (Arc<Self>, FakeLanguageServer) {
502 let (stdin_writer, stdin_reader) = async_pipe::pipe();
503 let (stdout_writer, stdout_reader) = async_pipe::pipe();
504
505 let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer);
506 fake.handle_request::<request::Initialize, _>({
507 let capabilities = capabilities.clone();
508 move |_| InitializeResult {
509 capabilities: capabilities.clone(),
510 ..Default::default()
511 }
512 });
513
514 let server =
515 Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap();
516
517 (server, fake)
518 }
519}
520
521#[cfg(any(test, feature = "test-support"))]
522impl FakeLanguageServer {
523 fn new(
524 background: Arc<gpui::executor::Background>,
525 stdin: async_pipe::PipeReader,
526 stdout: async_pipe::PipeWriter,
527 ) -> Self {
528 use futures::StreamExt as _;
529
530 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
531 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
532 let this = Self {
533 outgoing_tx: outgoing_tx.clone(),
534 incoming_rx,
535 handlers: Default::default(),
536 };
537
538 // Receive incoming messages
539 let handlers = this.handlers.clone();
540 let executor = background.clone();
541 background
542 .spawn(async move {
543 let mut buffer = Vec::new();
544 let mut stdin = smol::io::BufReader::new(stdin);
545 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
546 executor.simulate_random_delay().await;
547 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
548 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
549
550 if let Some(handler) = handlers.lock().get_mut(request.method) {
551 let response = handler(request.id, request.params.get().as_bytes());
552 log::debug!("handled lsp request. method:{}", request.method);
553 outgoing_tx.unbounded_send(response)?;
554 } else {
555 log::debug!("unhandled lsp request. method:{}", request.method);
556 outgoing_tx.unbounded_send(
557 serde_json::to_vec(&AnyResponse {
558 id: request.id,
559 error: Some(Error {
560 message: "no handler".to_string(),
561 }),
562 result: None,
563 })
564 .unwrap(),
565 )?;
566 }
567 } else {
568 incoming_tx.unbounded_send(buffer.clone())?;
569 }
570 }
571 Ok::<_, anyhow::Error>(())
572 })
573 .detach();
574
575 // Send outgoing messages
576 background
577 .spawn(async move {
578 let mut stdout = smol::io::BufWriter::new(stdout);
579 while let Some(notification) = outgoing_rx.next().await {
580 Self::send(&mut stdout, ¬ification).await;
581 }
582 })
583 .detach();
584
585 this
586 }
587
588 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
589 let message = serde_json::to_vec(&Notification {
590 jsonrpc: JSON_RPC_VERSION,
591 method: T::METHOD,
592 params,
593 })
594 .unwrap();
595 self.outgoing_tx.unbounded_send(message).unwrap();
596 }
597
598 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
599 use futures::StreamExt as _;
600
601 loop {
602 let bytes = self.incoming_rx.next().await.unwrap();
603 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
604 assert_eq!(notification.method, T::METHOD);
605 return notification.params;
606 } else {
607 log::info!(
608 "skipping message in fake language server {:?}",
609 std::str::from_utf8(&bytes)
610 );
611 }
612 }
613 }
614
615 pub fn handle_request<T, F>(
616 &mut self,
617 mut handler: F,
618 ) -> futures::channel::mpsc::UnboundedReceiver<()>
619 where
620 T: 'static + request::Request,
621 F: 'static + Send + Sync + FnMut(T::Params) -> T::Result,
622 {
623 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
624 self.handlers.lock().insert(
625 T::METHOD,
626 Box::new(move |id, params| {
627 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
628 let result = serde_json::to_string(&result).unwrap();
629 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
630 let response = AnyResponse {
631 id,
632 error: None,
633 result: Some(result),
634 };
635 responded_tx.unbounded_send(()).ok();
636 serde_json::to_vec(&response).unwrap()
637 }),
638 );
639 responded_rx
640 }
641
642 pub fn remove_request_handler<T>(&mut self)
643 where
644 T: 'static + request::Request,
645 {
646 self.handlers.lock().remove(T::METHOD);
647 }
648
649 pub async fn start_progress(&mut self, token: impl Into<String>) {
650 self.notify::<notification::Progress>(ProgressParams {
651 token: NumberOrString::String(token.into()),
652 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
653 })
654 .await;
655 }
656
657 pub async fn end_progress(&mut self, token: impl Into<String>) {
658 self.notify::<notification::Progress>(ProgressParams {
659 token: NumberOrString::String(token.into()),
660 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
661 })
662 .await;
663 }
664
665 async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
666 stdout
667 .write_all(CONTENT_LEN_HEADER.as_bytes())
668 .await
669 .unwrap();
670 stdout
671 .write_all((format!("{}", message.len())).as_bytes())
672 .await
673 .unwrap();
674 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
675 stdout.write_all(&message).await.unwrap();
676 stdout.flush().await.unwrap();
677 }
678
679 async fn receive(
680 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
681 buffer: &mut Vec<u8>,
682 ) -> Result<()> {
683 buffer.clear();
684 stdin.read_until(b'\n', buffer).await?;
685 stdin.read_until(b'\n', buffer).await?;
686 let message_len: usize = std::str::from_utf8(buffer)
687 .unwrap()
688 .strip_prefix(CONTENT_LEN_HEADER)
689 .unwrap()
690 .trim_end()
691 .parse()
692 .unwrap();
693 buffer.resize(message_len, 0);
694 stdin.read_exact(buffer).await?;
695 Ok(())
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702 use gpui::TestAppContext;
703 use unindent::Unindent;
704 use util::test::temp_tree;
705
706 #[ctor::ctor]
707 fn init_logger() {
708 if std::env::var("RUST_LOG").is_ok() {
709 env_logger::init();
710 }
711 }
712
713 #[gpui::test]
714 async fn test_rust_analyzer(cx: TestAppContext) {
715 let lib_source = r#"
716 fn fun() {
717 let hello = "world";
718 }
719 "#
720 .unindent();
721 let root_dir = temp_tree(json!({
722 "Cargo.toml": r#"
723 [package]
724 name = "temp"
725 version = "0.1.0"
726 edition = "2018"
727 "#.unindent(),
728 "src": {
729 "lib.rs": &lib_source
730 }
731 }));
732 let lib_file_uri = Url::from_file_path(root_dir.path().join("src/lib.rs")).unwrap();
733
734 let server =
735 LanguageServer::new(Path::new("rust-analyzer"), root_dir.path(), cx.background())
736 .unwrap();
737 server.next_idle_notification().await;
738
739 server
740 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
741 text_document: TextDocumentItem::new(
742 lib_file_uri.clone(),
743 "rust".to_string(),
744 0,
745 lib_source,
746 ),
747 })
748 .await
749 .unwrap();
750
751 let hover = server
752 .request::<request::HoverRequest>(HoverParams {
753 text_document_position_params: TextDocumentPositionParams {
754 text_document: TextDocumentIdentifier::new(lib_file_uri),
755 position: Position::new(1, 21),
756 },
757 work_done_progress_params: Default::default(),
758 })
759 .await
760 .unwrap()
761 .unwrap();
762 assert_eq!(
763 hover.contents,
764 HoverContents::Markup(MarkupContent {
765 kind: MarkupKind::PlainText,
766 value: "&str".to_string()
767 })
768 );
769 }
770
771 #[gpui::test]
772 async fn test_fake(cx: TestAppContext) {
773 let (server, mut fake) = LanguageServer::fake(cx.background());
774
775 let (message_tx, message_rx) = channel::unbounded();
776 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
777 server
778 .on_notification::<notification::ShowMessage, _>(move |params| {
779 message_tx.try_send(params).unwrap()
780 })
781 .detach();
782 server
783 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
784 diagnostics_tx.try_send(params).unwrap()
785 })
786 .detach();
787
788 server
789 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
790 text_document: TextDocumentItem::new(
791 Url::from_str("file://a/b").unwrap(),
792 "rust".to_string(),
793 0,
794 "".to_string(),
795 ),
796 })
797 .await
798 .unwrap();
799 assert_eq!(
800 fake.receive_notification::<notification::DidOpenTextDocument>()
801 .await
802 .text_document
803 .uri
804 .as_str(),
805 "file://a/b"
806 );
807
808 fake.notify::<notification::ShowMessage>(ShowMessageParams {
809 typ: MessageType::ERROR,
810 message: "ok".to_string(),
811 })
812 .await;
813 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
814 uri: Url::from_str("file://b/c").unwrap(),
815 version: Some(5),
816 diagnostics: vec![],
817 })
818 .await;
819 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
820 assert_eq!(
821 diagnostics_rx.recv().await.unwrap().uri.as_str(),
822 "file://b/c"
823 );
824
825 fake.handle_request::<request::Shutdown, _>(|_| ());
826
827 drop(server);
828 fake.receive_notification::<notification::Exit>().await;
829 }
830
831 impl LanguageServer {
832 async fn next_idle_notification(self: &Arc<Self>) {
833 let (tx, rx) = channel::unbounded();
834 let _subscription =
835 self.on_notification::<ServerStatusNotification, _>(move |params| {
836 if params.quiescent {
837 tx.try_send(()).unwrap();
838 }
839 });
840 let _ = rx.recv().await;
841 }
842 }
843
844 pub enum ServerStatusNotification {}
845
846 impl notification::Notification for ServerStatusNotification {
847 type Params = ServerStatusParams;
848 const METHOD: &'static str = "experimental/serverStatus";
849 }
850
851 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
852 pub struct ServerStatusParams {
853 pub quiescent: bool,
854 }
855}