1use anyhow::{anyhow, Context, Result};
2use collections::HashMap;
3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
4use gpui::{executor, Task};
5use parking_lot::{Mutex, RwLock};
6use postage::{barrier, prelude::Stream, watch};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, value::RawValue, Value};
9use smol::{
10 channel,
11 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use std::{
15 future::Future,
16 io::Write,
17 str::FromStr,
18 sync::{
19 atomic::{AtomicUsize, Ordering::SeqCst},
20 Arc,
21 },
22};
23use std::{path::Path, process::Stdio};
24use util::TryFutureExt;
25
26pub use lsp_types::*;
27
28const JSON_RPC_VERSION: &'static str = "2.0";
29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
30
31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
33
34pub struct LanguageServer {
35 next_id: AtomicUsize,
36 outbound_tx: channel::Sender<Vec<u8>>,
37 capabilities: watch::Receiver<Option<ServerCapabilities>>,
38 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
39 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
40 executor: Arc<executor::Background>,
41 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
42 initialized: barrier::Receiver,
43 output_done_rx: Mutex<Option<barrier::Receiver>>,
44}
45
46pub struct Subscription {
47 method: &'static str,
48 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
49}
50
51#[derive(Serialize, Deserialize)]
52struct Request<'a, T> {
53 jsonrpc: &'a str,
54 id: usize,
55 method: &'a str,
56 params: T,
57}
58
59#[cfg(any(test, feature = "test-support"))]
60#[derive(Deserialize)]
61struct AnyRequest<'a> {
62 id: usize,
63 #[serde(borrow)]
64 jsonrpc: &'a str,
65 #[serde(borrow)]
66 method: &'a str,
67 #[serde(borrow)]
68 params: &'a RawValue,
69}
70
71#[derive(Serialize, Deserialize)]
72struct AnyResponse<'a> {
73 id: usize,
74 #[serde(default)]
75 error: Option<Error>,
76 #[serde(borrow)]
77 result: Option<&'a RawValue>,
78}
79
80#[derive(Serialize, Deserialize)]
81struct Notification<'a, T> {
82 #[serde(borrow)]
83 jsonrpc: &'a str,
84 #[serde(borrow)]
85 method: &'a str,
86 params: T,
87}
88
89#[derive(Deserialize)]
90struct AnyNotification<'a> {
91 #[serde(borrow)]
92 method: &'a str,
93 #[serde(borrow)]
94 params: &'a RawValue,
95}
96
97#[derive(Debug, Serialize, Deserialize)]
98struct Error {
99 message: String,
100}
101
102impl LanguageServer {
103 pub fn new(
104 binary_path: &Path,
105 root_path: &Path,
106 background: Arc<executor::Background>,
107 ) -> Result<Arc<Self>> {
108 let mut server = Command::new(binary_path)
109 .stdin(Stdio::piped())
110 .stdout(Stdio::piped())
111 .stderr(Stdio::inherit())
112 .spawn()?;
113 let stdin = server.stdin.take().unwrap();
114 let stdout = server.stdout.take().unwrap();
115 Self::new_internal(stdin, stdout, root_path, background)
116 }
117
118 fn new_internal<Stdin, Stdout>(
119 stdin: Stdin,
120 stdout: Stdout,
121 root_path: &Path,
122 executor: Arc<executor::Background>,
123 ) -> Result<Arc<Self>>
124 where
125 Stdin: AsyncWrite + Unpin + Send + 'static,
126 Stdout: AsyncRead + Unpin + Send + 'static,
127 {
128 let mut stdin = BufWriter::new(stdin);
129 let mut stdout = BufReader::new(stdout);
130 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131 let notification_handlers =
132 Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
133 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
134 let input_task = executor.spawn(
135 {
136 let notification_handlers = notification_handlers.clone();
137 let response_handlers = response_handlers.clone();
138 async move {
139 let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
140 let mut buffer = Vec::new();
141 loop {
142 buffer.clear();
143 stdout.read_until(b'\n', &mut buffer).await?;
144 stdout.read_until(b'\n', &mut buffer).await?;
145 let message_len: usize = std::str::from_utf8(&buffer)?
146 .strip_prefix(CONTENT_LEN_HEADER)
147 .ok_or_else(|| anyhow!("invalid header"))?
148 .trim_end()
149 .parse()?;
150
151 buffer.resize(message_len, 0);
152 stdout.read_exact(&mut buffer).await?;
153
154 if let Ok(AnyNotification { method, params }) =
155 serde_json::from_slice(&buffer)
156 {
157 if let Some(handler) = notification_handlers.write().get_mut(method) {
158 handler(params.get());
159 } else {
160 log::info!(
161 "unhandled notification {}:\n{}",
162 method,
163 serde_json::to_string_pretty(
164 &Value::from_str(params.get()).unwrap()
165 )
166 .unwrap()
167 );
168 }
169 } else if let Ok(AnyResponse { id, error, result }) =
170 serde_json::from_slice(&buffer)
171 {
172 if let Some(handler) = response_handlers.lock().remove(&id) {
173 if let Some(error) = error {
174 handler(Err(error));
175 } else if let Some(result) = result {
176 handler(Ok(result.get()));
177 } else {
178 handler(Ok("null"));
179 }
180 }
181 } else {
182 return Err(anyhow!(
183 "failed to deserialize message:\n{}",
184 std::str::from_utf8(&buffer)?
185 ));
186 }
187 }
188 }
189 }
190 .log_err(),
191 );
192 let (output_done_tx, output_done_rx) = barrier::channel();
193 let output_task = executor.spawn({
194 let response_handlers = response_handlers.clone();
195 async move {
196 let _clear_response_handlers = ClearResponseHandlers(response_handlers);
197 let mut content_len_buffer = Vec::new();
198 while let Ok(message) = outbound_rx.recv().await {
199 content_len_buffer.clear();
200 write!(content_len_buffer, "{}", message.len()).unwrap();
201 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
202 stdin.write_all(&content_len_buffer).await?;
203 stdin.write_all("\r\n\r\n".as_bytes()).await?;
204 stdin.write_all(&message).await?;
205 stdin.flush().await?;
206 }
207 drop(output_done_tx);
208 Ok(())
209 }
210 .log_err()
211 });
212
213 let (initialized_tx, initialized_rx) = barrier::channel();
214 let (mut capabilities_tx, capabilities_rx) = watch::channel();
215 let this = Arc::new(Self {
216 notification_handlers,
217 response_handlers,
218 capabilities: capabilities_rx,
219 next_id: Default::default(),
220 outbound_tx,
221 executor: executor.clone(),
222 io_tasks: Mutex::new(Some((input_task, output_task))),
223 initialized: initialized_rx,
224 output_done_rx: Mutex::new(Some(output_done_rx)),
225 });
226
227 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
228 executor
229 .spawn({
230 let this = this.clone();
231 async move {
232 if let Some(capabilities) = this.init(root_uri).log_err().await {
233 *capabilities_tx.borrow_mut() = Some(capabilities);
234 }
235
236 drop(initialized_tx);
237 }
238 })
239 .detach();
240
241 Ok(this)
242 }
243
244 async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
245 #[allow(deprecated)]
246 let params = InitializeParams {
247 process_id: Default::default(),
248 root_path: Default::default(),
249 root_uri: Some(root_uri),
250 initialization_options: Default::default(),
251 capabilities: ClientCapabilities {
252 text_document: Some(TextDocumentClientCapabilities {
253 definition: Some(GotoCapability {
254 link_support: Some(true),
255 ..Default::default()
256 }),
257 code_action: Some(CodeActionClientCapabilities {
258 code_action_literal_support: Some(CodeActionLiteralSupport {
259 code_action_kind: CodeActionKindLiteralSupport {
260 value_set: vec![
261 CodeActionKind::REFACTOR.as_str().into(),
262 CodeActionKind::QUICKFIX.as_str().into(),
263 ],
264 },
265 }),
266 data_support: Some(true),
267 resolve_support: Some(CodeActionCapabilityResolveSupport {
268 properties: vec!["edit".to_string()],
269 }),
270 ..Default::default()
271 }),
272 completion: Some(CompletionClientCapabilities {
273 completion_item: Some(CompletionItemCapability {
274 snippet_support: Some(true),
275 resolve_support: Some(CompletionItemCapabilityResolveSupport {
276 properties: vec!["additionalTextEdits".to_string()],
277 }),
278 ..Default::default()
279 }),
280 ..Default::default()
281 }),
282 ..Default::default()
283 }),
284 experimental: Some(json!({
285 "serverStatusNotification": true,
286 })),
287 window: Some(WindowClientCapabilities {
288 work_done_progress: Some(true),
289 ..Default::default()
290 }),
291 ..Default::default()
292 },
293 trace: Default::default(),
294 workspace_folders: Default::default(),
295 client_info: Default::default(),
296 locale: Default::default(),
297 };
298
299 let this = self.clone();
300 let request = Self::request_internal::<request::Initialize>(
301 &this.next_id,
302 &this.response_handlers,
303 &this.outbound_tx,
304 params,
305 );
306 let response = request.await?;
307 Self::notify_internal::<notification::Initialized>(
308 &this.outbound_tx,
309 InitializedParams {},
310 )?;
311 Ok(response.capabilities)
312 }
313
314 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
315 if let Some(tasks) = self.io_tasks.lock().take() {
316 let response_handlers = self.response_handlers.clone();
317 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
318 let outbound_tx = self.outbound_tx.clone();
319 let mut output_done = self.output_done_rx.lock().take().unwrap();
320 let shutdown_request = Self::request_internal::<request::Shutdown>(
321 &next_id,
322 &response_handlers,
323 &outbound_tx,
324 (),
325 );
326 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
327 outbound_tx.close();
328 Some(
329 async move {
330 log::debug!("language server shutdown started");
331 shutdown_request.await?;
332 response_handlers.lock().clear();
333 exit?;
334 output_done.recv().await;
335 log::debug!("language server shutdown finished");
336 drop(tasks);
337 Ok(())
338 }
339 .log_err(),
340 )
341 } else {
342 None
343 }
344 }
345
346 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
347 where
348 T: notification::Notification,
349 F: 'static + Send + Sync + FnMut(T::Params),
350 {
351 let prev_handler = self.notification_handlers.write().insert(
352 T::METHOD,
353 Box::new(
354 move |notification| match serde_json::from_str(notification) {
355 Ok(notification) => f(notification),
356 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
357 },
358 ),
359 );
360
361 assert!(
362 prev_handler.is_none(),
363 "registered multiple handlers for the same notification"
364 );
365
366 Subscription {
367 method: T::METHOD,
368 notification_handlers: self.notification_handlers.clone(),
369 }
370 }
371
372 pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
373 self.capabilities.clone()
374 }
375
376 pub fn request<T: request::Request>(
377 self: &Arc<Self>,
378 params: T::Params,
379 ) -> impl Future<Output = Result<T::Result>>
380 where
381 T::Result: 'static + Send,
382 {
383 let this = self.clone();
384 async move {
385 this.initialized.clone().recv().await;
386 Self::request_internal::<T>(
387 &this.next_id,
388 &this.response_handlers,
389 &this.outbound_tx,
390 params,
391 )
392 .await
393 }
394 }
395
396 fn request_internal<T: request::Request>(
397 next_id: &AtomicUsize,
398 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
399 outbound_tx: &channel::Sender<Vec<u8>>,
400 params: T::Params,
401 ) -> impl 'static + Future<Output = Result<T::Result>>
402 where
403 T::Result: 'static + Send,
404 {
405 let id = next_id.fetch_add(1, SeqCst);
406 let message = serde_json::to_vec(&Request {
407 jsonrpc: JSON_RPC_VERSION,
408 id,
409 method: T::METHOD,
410 params,
411 })
412 .unwrap();
413
414 let send = outbound_tx
415 .try_send(message)
416 .context("failed to write to language server's stdin");
417
418 let (tx, rx) = oneshot::channel();
419 response_handlers.lock().insert(
420 id,
421 Box::new(move |result| {
422 let response = match result {
423 Ok(response) => {
424 serde_json::from_str(response).context("failed to deserialize response")
425 }
426 Err(error) => Err(anyhow!("{}", error.message)),
427 };
428 let _ = tx.send(response);
429 }),
430 );
431
432 async move {
433 send?;
434 rx.await?
435 }
436 }
437
438 pub fn notify<T: notification::Notification>(
439 self: &Arc<Self>,
440 params: T::Params,
441 ) -> impl Future<Output = Result<()>> {
442 let this = self.clone();
443 async move {
444 this.initialized.clone().recv().await;
445 Self::notify_internal::<T>(&this.outbound_tx, params)?;
446 Ok(())
447 }
448 }
449
450 fn notify_internal<T: notification::Notification>(
451 outbound_tx: &channel::Sender<Vec<u8>>,
452 params: T::Params,
453 ) -> Result<()> {
454 let message = serde_json::to_vec(&Notification {
455 jsonrpc: JSON_RPC_VERSION,
456 method: T::METHOD,
457 params,
458 })
459 .unwrap();
460 outbound_tx.try_send(message)?;
461 Ok(())
462 }
463}
464
465impl Drop for LanguageServer {
466 fn drop(&mut self) {
467 if let Some(shutdown) = self.shutdown() {
468 self.executor.spawn(shutdown).detach();
469 }
470 }
471}
472
473impl Subscription {
474 pub fn detach(mut self) {
475 self.method = "";
476 }
477}
478
479impl Drop for Subscription {
480 fn drop(&mut self) {
481 self.notification_handlers.write().remove(self.method);
482 }
483}
484
485#[cfg(any(test, feature = "test-support"))]
486pub struct FakeLanguageServer {
487 handlers: FakeLanguageServerHandlers,
488 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
489 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
490 _input_task: Task<Result<()>>,
491 _output_task: Task<Result<()>>,
492}
493
494#[cfg(any(test, feature = "test-support"))]
495type FakeLanguageServerHandlers = Arc<
496 Mutex<
497 HashMap<
498 &'static str,
499 Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
500 >,
501 >,
502>;
503
504#[cfg(any(test, feature = "test-support"))]
505impl LanguageServer {
506 pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
507 Self::fake_with_capabilities(Default::default(), cx)
508 }
509
510 pub fn fake_with_capabilities(
511 capabilities: ServerCapabilities,
512 cx: &mut gpui::MutableAppContext,
513 ) -> (Arc<Self>, FakeLanguageServer) {
514 let (stdin_writer, stdin_reader) = async_pipe::pipe();
515 let (stdout_writer, stdout_reader) = async_pipe::pipe();
516
517 let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
518 fake.handle_request::<request::Initialize, _>({
519 let capabilities = capabilities.clone();
520 move |_, _| InitializeResult {
521 capabilities: capabilities.clone(),
522 ..Default::default()
523 }
524 });
525
526 let server = Self::new_internal(
527 stdin_writer,
528 stdout_reader,
529 Path::new("/"),
530 cx.background().clone(),
531 )
532 .unwrap();
533
534 (server, fake)
535 }
536}
537
538#[cfg(any(test, feature = "test-support"))]
539impl FakeLanguageServer {
540 fn new(
541 stdin: async_pipe::PipeReader,
542 stdout: async_pipe::PipeWriter,
543 cx: &mut gpui::MutableAppContext,
544 ) -> Self {
545 use futures::StreamExt as _;
546
547 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
548 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
549 let handlers = FakeLanguageServerHandlers::default();
550
551 let input_task = cx.spawn(|cx| {
552 let handlers = handlers.clone();
553 let outgoing_tx = outgoing_tx.clone();
554 async move {
555 let mut buffer = Vec::new();
556 let mut stdin = smol::io::BufReader::new(stdin);
557 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
558 cx.background().simulate_random_delay().await;
559 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
560 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
561
562 let response;
563 if let Some(handler) = handlers.lock().get_mut(request.method) {
564 response =
565 handler(request.id, request.params.get().as_bytes(), cx.clone());
566 log::debug!("handled lsp request. method:{}", request.method);
567 } else {
568 response = serde_json::to_vec(&AnyResponse {
569 id: request.id,
570 error: Some(Error {
571 message: "no handler".to_string(),
572 }),
573 result: None,
574 })
575 .unwrap();
576 log::debug!("unhandled lsp request. method:{}", request.method);
577 }
578 outgoing_tx.unbounded_send(response)?;
579 } else {
580 incoming_tx.unbounded_send(buffer.clone())?;
581 }
582 }
583 Ok::<_, anyhow::Error>(())
584 }
585 });
586
587 let output_task = cx.background().spawn(async move {
588 let mut stdout = smol::io::BufWriter::new(stdout);
589 while let Some(message) = outgoing_rx.next().await {
590 stdout
591 .write_all(CONTENT_LEN_HEADER.as_bytes())
592 .await
593 .unwrap();
594 stdout
595 .write_all((format!("{}", message.len())).as_bytes())
596 .await
597 .unwrap();
598 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
599 stdout.write_all(&message).await.unwrap();
600 stdout.flush().await.unwrap();
601 }
602 Ok(())
603 });
604
605 Self {
606 outgoing_tx,
607 incoming_rx,
608 handlers,
609 _input_task: input_task,
610 _output_task: output_task,
611 }
612 }
613
614 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
615 let message = serde_json::to_vec(&Notification {
616 jsonrpc: JSON_RPC_VERSION,
617 method: T::METHOD,
618 params,
619 })
620 .unwrap();
621 self.outgoing_tx.unbounded_send(message).unwrap();
622 }
623
624 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
625 use futures::StreamExt as _;
626
627 loop {
628 let bytes = self.incoming_rx.next().await.unwrap();
629 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
630 assert_eq!(notification.method, T::METHOD);
631 return notification.params;
632 } else {
633 log::info!(
634 "skipping message in fake language server {:?}",
635 std::str::from_utf8(&bytes)
636 );
637 }
638 }
639 }
640
641 pub fn handle_request<T, F>(
642 &mut self,
643 mut handler: F,
644 ) -> futures::channel::mpsc::UnboundedReceiver<()>
645 where
646 T: 'static + request::Request,
647 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
648 {
649 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
650 self.handlers.lock().insert(
651 T::METHOD,
652 Box::new(move |id, params, cx| {
653 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
654 let result = serde_json::to_string(&result).unwrap();
655 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
656 let response = AnyResponse {
657 id,
658 error: None,
659 result: Some(result),
660 };
661 responded_tx.unbounded_send(()).ok();
662 serde_json::to_vec(&response).unwrap()
663 }),
664 );
665 responded_rx
666 }
667
668 pub fn remove_request_handler<T>(&mut self)
669 where
670 T: 'static + request::Request,
671 {
672 self.handlers.lock().remove(T::METHOD);
673 }
674
675 pub async fn start_progress(&mut self, token: impl Into<String>) {
676 self.notify::<notification::Progress>(ProgressParams {
677 token: NumberOrString::String(token.into()),
678 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
679 })
680 .await;
681 }
682
683 pub async fn end_progress(&mut self, token: impl Into<String>) {
684 self.notify::<notification::Progress>(ProgressParams {
685 token: NumberOrString::String(token.into()),
686 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
687 })
688 .await;
689 }
690
691 async fn receive(
692 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
693 buffer: &mut Vec<u8>,
694 ) -> Result<()> {
695 buffer.clear();
696 stdin.read_until(b'\n', buffer).await?;
697 stdin.read_until(b'\n', buffer).await?;
698 let message_len: usize = std::str::from_utf8(buffer)
699 .unwrap()
700 .strip_prefix(CONTENT_LEN_HEADER)
701 .ok_or_else(|| anyhow!("invalid content length header"))?
702 .trim_end()
703 .parse()
704 .unwrap();
705 buffer.resize(message_len, 0);
706 stdin.read_exact(buffer).await?;
707 Ok(())
708 }
709}
710
711struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
712
713impl Drop for ClearResponseHandlers {
714 fn drop(&mut self) {
715 self.0.lock().clear();
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722 use gpui::TestAppContext;
723
724 #[ctor::ctor]
725 fn init_logger() {
726 if std::env::var("RUST_LOG").is_ok() {
727 env_logger::init();
728 }
729 }
730
731 #[gpui::test]
732 async fn test_fake(cx: &mut TestAppContext) {
733 let (server, mut fake) = cx.update(LanguageServer::fake);
734
735 let (message_tx, message_rx) = channel::unbounded();
736 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
737 server
738 .on_notification::<notification::ShowMessage, _>(move |params| {
739 message_tx.try_send(params).unwrap()
740 })
741 .detach();
742 server
743 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
744 diagnostics_tx.try_send(params).unwrap()
745 })
746 .detach();
747
748 server
749 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
750 text_document: TextDocumentItem::new(
751 Url::from_str("file://a/b").unwrap(),
752 "rust".to_string(),
753 0,
754 "".to_string(),
755 ),
756 })
757 .await
758 .unwrap();
759 assert_eq!(
760 fake.receive_notification::<notification::DidOpenTextDocument>()
761 .await
762 .text_document
763 .uri
764 .as_str(),
765 "file://a/b"
766 );
767
768 fake.notify::<notification::ShowMessage>(ShowMessageParams {
769 typ: MessageType::ERROR,
770 message: "ok".to_string(),
771 })
772 .await;
773 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
774 uri: Url::from_str("file://b/c").unwrap(),
775 version: Some(5),
776 diagnostics: vec![],
777 })
778 .await;
779 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
780 assert_eq!(
781 diagnostics_rx.recv().await.unwrap().uri.as_str(),
782 "file://b/c"
783 );
784
785 fake.handle_request::<request::Shutdown, _>(|_, _| ());
786
787 drop(server);
788 fake.receive_notification::<notification::Exit>().await;
789 }
790
791 pub enum ServerStatusNotification {}
792
793 impl notification::Notification for ServerStatusNotification {
794 type Params = ServerStatusParams;
795 const METHOD: &'static str = "experimental/serverStatus";
796 }
797
798 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
799 pub struct ServerStatusParams {
800 pub quiescent: bool,
801 }
802}