1use anyhow::{anyhow, Context, Result};
2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
3use gpui::{executor, Task};
4use parking_lot::{Mutex, RwLock};
5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, value::RawValue, Value};
8use smol::{
9 channel,
10 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
11 process::Command,
12};
13use std::{
14 collections::HashMap,
15 future::Future,
16 io::Write,
17 str::FromStr,
18 sync::{
19 atomic::{AtomicUsize, Ordering::SeqCst},
20 Arc,
21 },
22};
23use std::{path::Path, process::Stdio};
24use util::TryFutureExt;
25
26pub use lsp_types::*;
27
28const JSON_RPC_VERSION: &'static str = "2.0";
29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
30
31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
33
34pub struct LanguageServer {
35 next_id: AtomicUsize,
36 outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
37 capabilities: watch::Receiver<Option<ServerCapabilities>>,
38 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
39 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
40 executor: Arc<executor::Background>,
41 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
42 initialized: barrier::Receiver,
43 output_done_rx: Mutex<Option<barrier::Receiver>>,
44}
45
46pub struct Subscription {
47 method: &'static str,
48 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
49}
50
51#[derive(Serialize, Deserialize)]
52struct Request<'a, T> {
53 jsonrpc: &'a str,
54 id: usize,
55 method: &'a str,
56 params: T,
57}
58
59#[derive(Serialize, Deserialize)]
60struct AnyResponse<'a> {
61 id: usize,
62 #[serde(default)]
63 error: Option<Error>,
64 #[serde(borrow)]
65 result: Option<&'a RawValue>,
66}
67
68#[derive(Serialize, Deserialize)]
69struct Notification<'a, T> {
70 #[serde(borrow)]
71 jsonrpc: &'a str,
72 #[serde(borrow)]
73 method: &'a str,
74 params: T,
75}
76
77#[derive(Deserialize)]
78struct AnyNotification<'a> {
79 #[serde(borrow)]
80 method: &'a str,
81 #[serde(borrow)]
82 params: &'a RawValue,
83}
84
85#[derive(Debug, Serialize, Deserialize)]
86struct Error {
87 message: String,
88}
89
90impl LanguageServer {
91 pub fn new(
92 binary_path: &Path,
93 root_path: &Path,
94 background: Arc<executor::Background>,
95 ) -> Result<Arc<Self>> {
96 let mut server = Command::new(binary_path)
97 .stdin(Stdio::piped())
98 .stdout(Stdio::piped())
99 .stderr(Stdio::inherit())
100 .spawn()?;
101 let stdin = server.stdin.take().unwrap();
102 let stdout = server.stdout.take().unwrap();
103 Self::new_internal(stdin, stdout, root_path, background)
104 }
105
106 fn new_internal<Stdin, Stdout>(
107 stdin: Stdin,
108 stdout: Stdout,
109 root_path: &Path,
110 executor: Arc<executor::Background>,
111 ) -> Result<Arc<Self>>
112 where
113 Stdin: AsyncWrite + Unpin + Send + 'static,
114 Stdout: AsyncRead + Unpin + Send + 'static,
115 {
116 let mut stdin = BufWriter::new(stdin);
117 let mut stdout = BufReader::new(stdout);
118 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
119 let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
120 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
121 let input_task = executor.spawn(
122 {
123 let notification_handlers = notification_handlers.clone();
124 let response_handlers = response_handlers.clone();
125 async move {
126 let mut buffer = Vec::new();
127 loop {
128 buffer.clear();
129 stdout.read_until(b'\n', &mut buffer).await?;
130 stdout.read_until(b'\n', &mut buffer).await?;
131 let message_len: usize = std::str::from_utf8(&buffer)?
132 .strip_prefix(CONTENT_LEN_HEADER)
133 .ok_or_else(|| anyhow!("invalid header"))?
134 .trim_end()
135 .parse()?;
136
137 buffer.resize(message_len, 0);
138 stdout.read_exact(&mut buffer).await?;
139
140 if let Ok(AnyNotification { method, params }) =
141 serde_json::from_slice(&buffer)
142 {
143 if let Some(handler) = notification_handlers.write().get_mut(method) {
144 handler(params.get());
145 } else {
146 log::info!(
147 "unhandled notification {}:\n{}",
148 method,
149 serde_json::to_string_pretty(
150 &Value::from_str(params.get()).unwrap()
151 )
152 .unwrap()
153 );
154 }
155 } else if let Ok(AnyResponse { id, error, result }) =
156 serde_json::from_slice(&buffer)
157 {
158 if let Some(handler) = response_handlers.lock().remove(&id) {
159 if let Some(error) = error {
160 handler(Err(error));
161 } else if let Some(result) = result {
162 handler(Ok(result.get()));
163 } else {
164 handler(Ok("null"));
165 }
166 }
167 } else {
168 return Err(anyhow!(
169 "failed to deserialize message:\n{}",
170 std::str::from_utf8(&buffer)?
171 ));
172 }
173 }
174 }
175 }
176 .log_err(),
177 );
178 let (output_done_tx, output_done_rx) = barrier::channel();
179 let output_task = executor.spawn(
180 async move {
181 let mut content_len_buffer = Vec::new();
182 while let Ok(message) = outbound_rx.recv().await {
183 content_len_buffer.clear();
184 write!(content_len_buffer, "{}", message.len()).unwrap();
185 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
186 stdin.write_all(&content_len_buffer).await?;
187 stdin.write_all("\r\n\r\n".as_bytes()).await?;
188 stdin.write_all(&message).await?;
189 stdin.flush().await?;
190 }
191 drop(output_done_tx);
192 Ok(())
193 }
194 .log_err(),
195 );
196
197 let (initialized_tx, initialized_rx) = barrier::channel();
198 let (mut capabilities_tx, capabilities_rx) = watch::channel();
199 let this = Arc::new(Self {
200 notification_handlers,
201 response_handlers,
202 capabilities: capabilities_rx,
203 next_id: Default::default(),
204 outbound_tx: RwLock::new(Some(outbound_tx)),
205 executor: executor.clone(),
206 io_tasks: Mutex::new(Some((input_task, output_task))),
207 initialized: initialized_rx,
208 output_done_rx: Mutex::new(Some(output_done_rx)),
209 });
210
211 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
212 executor
213 .spawn({
214 let this = this.clone();
215 async move {
216 if let Some(capabilities) = this.init(root_uri).log_err().await {
217 *capabilities_tx.borrow_mut() = Some(capabilities);
218 }
219
220 drop(initialized_tx);
221 }
222 })
223 .detach();
224
225 Ok(this)
226 }
227
228 async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
229 #[allow(deprecated)]
230 let params = InitializeParams {
231 process_id: Default::default(),
232 root_path: Default::default(),
233 root_uri: Some(root_uri),
234 initialization_options: Default::default(),
235 capabilities: ClientCapabilities {
236 text_document: Some(TextDocumentClientCapabilities {
237 definition: Some(GotoCapability {
238 link_support: Some(true),
239 ..Default::default()
240 }),
241 code_action: Some(CodeActionClientCapabilities {
242 code_action_literal_support: Some(CodeActionLiteralSupport {
243 code_action_kind: CodeActionKindLiteralSupport {
244 value_set: vec![
245 CodeActionKind::REFACTOR.as_str().into(),
246 CodeActionKind::QUICKFIX.as_str().into(),
247 ],
248 },
249 }),
250 data_support: Some(true),
251 resolve_support: Some(CodeActionCapabilityResolveSupport {
252 properties: vec!["edit".to_string()],
253 }),
254 ..Default::default()
255 }),
256 completion: Some(CompletionClientCapabilities {
257 completion_item: Some(CompletionItemCapability {
258 snippet_support: Some(true),
259 resolve_support: Some(CompletionItemCapabilityResolveSupport {
260 properties: vec!["additionalTextEdits".to_string()],
261 }),
262 ..Default::default()
263 }),
264 ..Default::default()
265 }),
266 ..Default::default()
267 }),
268 experimental: Some(json!({
269 "serverStatusNotification": true,
270 })),
271 window: Some(WindowClientCapabilities {
272 work_done_progress: Some(true),
273 ..Default::default()
274 }),
275 ..Default::default()
276 },
277 trace: Default::default(),
278 workspace_folders: Default::default(),
279 client_info: Default::default(),
280 locale: Default::default(),
281 };
282
283 let this = self.clone();
284 let request = Self::request_internal::<request::Initialize>(
285 &this.next_id,
286 &this.response_handlers,
287 this.outbound_tx.read().as_ref(),
288 params,
289 );
290 let response = request.await?;
291 Self::notify_internal::<notification::Initialized>(
292 this.outbound_tx.read().as_ref(),
293 InitializedParams {},
294 )?;
295 Ok(response.capabilities)
296 }
297
298 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
299 if let Some(tasks) = self.io_tasks.lock().take() {
300 let response_handlers = self.response_handlers.clone();
301 let outbound_tx = self.outbound_tx.write().take();
302 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
303 let mut output_done = self.output_done_rx.lock().take().unwrap();
304 Some(async move {
305 Self::request_internal::<request::Shutdown>(
306 &next_id,
307 &response_handlers,
308 outbound_tx.as_ref(),
309 (),
310 )
311 .await?;
312 Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
313 drop(outbound_tx);
314 output_done.recv().await;
315 drop(tasks);
316 Ok(())
317 })
318 } else {
319 None
320 }
321 }
322
323 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
324 where
325 T: notification::Notification,
326 F: 'static + Send + Sync + FnMut(T::Params),
327 {
328 let prev_handler = self.notification_handlers.write().insert(
329 T::METHOD,
330 Box::new(
331 move |notification| match serde_json::from_str(notification) {
332 Ok(notification) => f(notification),
333 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
334 },
335 ),
336 );
337
338 assert!(
339 prev_handler.is_none(),
340 "registered multiple handlers for the same notification"
341 );
342
343 Subscription {
344 method: T::METHOD,
345 notification_handlers: self.notification_handlers.clone(),
346 }
347 }
348
349 pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
350 self.capabilities.clone()
351 }
352
353 pub fn request<T: request::Request>(
354 self: &Arc<Self>,
355 params: T::Params,
356 ) -> impl Future<Output = Result<T::Result>>
357 where
358 T::Result: 'static + Send,
359 {
360 let this = self.clone();
361 async move {
362 this.initialized.clone().recv().await;
363 Self::request_internal::<T>(
364 &this.next_id,
365 &this.response_handlers,
366 this.outbound_tx.read().as_ref(),
367 params,
368 )
369 .await
370 }
371 }
372
373 fn request_internal<T: request::Request>(
374 next_id: &AtomicUsize,
375 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
376 outbound_tx: Option<&channel::Sender<Vec<u8>>>,
377 params: T::Params,
378 ) -> impl 'static + Future<Output = Result<T::Result>>
379 where
380 T::Result: 'static + Send,
381 {
382 let id = next_id.fetch_add(1, SeqCst);
383 let message = serde_json::to_vec(&Request {
384 jsonrpc: JSON_RPC_VERSION,
385 id,
386 method: T::METHOD,
387 params,
388 })
389 .unwrap();
390 let mut response_handlers = response_handlers.lock();
391 let (mut tx, mut rx) = oneshot::channel();
392 response_handlers.insert(
393 id,
394 Box::new(move |result| {
395 let response = match result {
396 Ok(response) => {
397 serde_json::from_str(response).context("failed to deserialize response")
398 }
399 Err(error) => Err(anyhow!("{}", error.message)),
400 };
401 let _ = tx.try_send(response);
402 }),
403 );
404
405 let send = outbound_tx
406 .as_ref()
407 .ok_or_else(|| {
408 anyhow!("tried to send a request to a language server that has been shut down")
409 })
410 .and_then(|outbound_tx| {
411 outbound_tx.try_send(message)?;
412 Ok(())
413 });
414 async move {
415 send?;
416 rx.recv().await.unwrap()
417 }
418 }
419
420 pub fn notify<T: notification::Notification>(
421 self: &Arc<Self>,
422 params: T::Params,
423 ) -> impl Future<Output = Result<()>> {
424 let this = self.clone();
425 async move {
426 this.initialized.clone().recv().await;
427 Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
428 Ok(())
429 }
430 }
431
432 fn notify_internal<T: notification::Notification>(
433 outbound_tx: Option<&channel::Sender<Vec<u8>>>,
434 params: T::Params,
435 ) -> Result<()> {
436 let message = serde_json::to_vec(&Notification {
437 jsonrpc: JSON_RPC_VERSION,
438 method: T::METHOD,
439 params,
440 })
441 .unwrap();
442 let outbound_tx = outbound_tx
443 .as_ref()
444 .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
445 outbound_tx.try_send(message)?;
446 Ok(())
447 }
448}
449
450impl Drop for LanguageServer {
451 fn drop(&mut self) {
452 if let Some(shutdown) = self.shutdown() {
453 self.executor.spawn(shutdown).detach();
454 }
455 }
456}
457
458impl Subscription {
459 pub fn detach(mut self) {
460 self.method = "";
461 }
462}
463
464impl Drop for Subscription {
465 fn drop(&mut self) {
466 self.notification_handlers.write().remove(self.method);
467 }
468}
469
470#[cfg(any(test, feature = "test-support"))]
471pub struct FakeLanguageServer {
472 buffer: Vec<u8>,
473 stdin: smol::io::BufReader<async_pipe::PipeReader>,
474 stdout: smol::io::BufWriter<async_pipe::PipeWriter>,
475 executor: std::rc::Rc<executor::Foreground>,
476 pub started: Arc<std::sync::atomic::AtomicBool>,
477}
478
479#[cfg(any(test, feature = "test-support"))]
480pub struct RequestId<T> {
481 id: usize,
482 _type: std::marker::PhantomData<T>,
483}
484
485#[cfg(any(test, feature = "test-support"))]
486impl LanguageServer {
487 pub async fn fake(cx: &gpui::TestAppContext) -> (Arc<Self>, FakeLanguageServer) {
488 Self::fake_with_capabilities(Default::default(), cx).await
489 }
490
491 pub async fn fake_with_capabilities(
492 capabilities: ServerCapabilities,
493 cx: &gpui::TestAppContext,
494 ) -> (Arc<Self>, FakeLanguageServer) {
495 let stdin = async_pipe::pipe();
496 let stdout = async_pipe::pipe();
497 let mut fake = FakeLanguageServer {
498 stdin: smol::io::BufReader::new(stdin.1),
499 stdout: smol::io::BufWriter::new(stdout.0),
500 buffer: Vec::new(),
501 executor: cx.foreground(),
502 started: Arc::new(std::sync::atomic::AtomicBool::new(true)),
503 };
504
505 let server =
506 Self::new_internal(stdin.0, stdout.1, Path::new("/"), cx.background()).unwrap();
507
508 let (init_id, _) = fake.receive_request::<request::Initialize>().await;
509 fake.respond(
510 init_id,
511 InitializeResult {
512 capabilities,
513 ..Default::default()
514 },
515 )
516 .await;
517 fake.receive_notification::<notification::Initialized>()
518 .await;
519
520 (server, fake)
521 }
522}
523
524#[cfg(any(test, feature = "test-support"))]
525impl FakeLanguageServer {
526 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
527 if !self.started.load(std::sync::atomic::Ordering::SeqCst) {
528 panic!("can't simulate an LSP notification before the server has been started");
529 }
530 let message = serde_json::to_vec(&Notification {
531 jsonrpc: JSON_RPC_VERSION,
532 method: T::METHOD,
533 params,
534 })
535 .unwrap();
536 self.send(message).await;
537 }
538
539 pub async fn respond<'a, T: request::Request>(
540 &mut self,
541 request_id: RequestId<T>,
542 result: T::Result,
543 ) {
544 let result = serde_json::to_string(&result).unwrap();
545 let message = serde_json::to_vec(&AnyResponse {
546 id: request_id.id,
547 error: None,
548 result: Some(&RawValue::from_string(result).unwrap()),
549 })
550 .unwrap();
551 self.send(message).await;
552 }
553
554 pub async fn receive_request<T: request::Request>(&mut self) -> (RequestId<T>, T::Params) {
555 let executor = self.executor.clone();
556 executor.start_waiting();
557 loop {
558 self.receive().await;
559 if let Ok(request) = serde_json::from_slice::<Request<T::Params>>(&self.buffer) {
560 assert_eq!(request.method, T::METHOD);
561 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
562 executor.finish_waiting();
563 return (
564 RequestId {
565 id: request.id,
566 _type: std::marker::PhantomData,
567 },
568 request.params,
569 );
570 } else {
571 log::info!(
572 "skipping message in fake language server {:?}",
573 std::str::from_utf8(&self.buffer)
574 );
575 }
576 }
577 }
578
579 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
580 self.receive().await;
581 let notification = serde_json::from_slice::<Notification<T::Params>>(&self.buffer).unwrap();
582 assert_eq!(notification.method, T::METHOD);
583 notification.params
584 }
585
586 pub async fn start_progress(&mut self, token: impl Into<String>) {
587 self.notify::<notification::Progress>(ProgressParams {
588 token: NumberOrString::String(token.into()),
589 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
590 })
591 .await;
592 }
593
594 pub async fn end_progress(&mut self, token: impl Into<String>) {
595 self.notify::<notification::Progress>(ProgressParams {
596 token: NumberOrString::String(token.into()),
597 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
598 })
599 .await;
600 }
601
602 async fn send(&mut self, message: Vec<u8>) {
603 self.stdout
604 .write_all(CONTENT_LEN_HEADER.as_bytes())
605 .await
606 .unwrap();
607 self.stdout
608 .write_all((format!("{}", message.len())).as_bytes())
609 .await
610 .unwrap();
611 self.stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
612 self.stdout.write_all(&message).await.unwrap();
613 self.stdout.flush().await.unwrap();
614 }
615
616 async fn receive(&mut self) {
617 self.buffer.clear();
618 self.stdin
619 .read_until(b'\n', &mut self.buffer)
620 .await
621 .unwrap();
622 self.stdin
623 .read_until(b'\n', &mut self.buffer)
624 .await
625 .unwrap();
626 let message_len: usize = std::str::from_utf8(&self.buffer)
627 .unwrap()
628 .strip_prefix(CONTENT_LEN_HEADER)
629 .unwrap()
630 .trim_end()
631 .parse()
632 .unwrap();
633 self.buffer.resize(message_len, 0);
634 self.stdin.read_exact(&mut self.buffer).await.unwrap();
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641 use gpui::TestAppContext;
642 use simplelog::SimpleLogger;
643 use unindent::Unindent;
644 use util::test::temp_tree;
645
646 #[gpui::test]
647 async fn test_rust_analyzer(cx: TestAppContext) {
648 let lib_source = r#"
649 fn fun() {
650 let hello = "world";
651 }
652 "#
653 .unindent();
654 let root_dir = temp_tree(json!({
655 "Cargo.toml": r#"
656 [package]
657 name = "temp"
658 version = "0.1.0"
659 edition = "2018"
660 "#.unindent(),
661 "src": {
662 "lib.rs": &lib_source
663 }
664 }));
665 let lib_file_uri = Url::from_file_path(root_dir.path().join("src/lib.rs")).unwrap();
666
667 let server = cx.read(|cx| {
668 LanguageServer::new(
669 Path::new("rust-analyzer"),
670 root_dir.path(),
671 cx.background().clone(),
672 )
673 .unwrap()
674 });
675 server.next_idle_notification().await;
676
677 server
678 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
679 text_document: TextDocumentItem::new(
680 lib_file_uri.clone(),
681 "rust".to_string(),
682 0,
683 lib_source,
684 ),
685 })
686 .await
687 .unwrap();
688
689 let hover = server
690 .request::<request::HoverRequest>(HoverParams {
691 text_document_position_params: TextDocumentPositionParams {
692 text_document: TextDocumentIdentifier::new(lib_file_uri),
693 position: Position::new(1, 21),
694 },
695 work_done_progress_params: Default::default(),
696 })
697 .await
698 .unwrap()
699 .unwrap();
700 assert_eq!(
701 hover.contents,
702 HoverContents::Markup(MarkupContent {
703 kind: MarkupKind::PlainText,
704 value: "&str".to_string()
705 })
706 );
707 }
708
709 #[gpui::test]
710 async fn test_fake(cx: TestAppContext) {
711 SimpleLogger::init(log::LevelFilter::Info, Default::default()).unwrap();
712
713 let (server, mut fake) = LanguageServer::fake(&cx).await;
714
715 let (message_tx, message_rx) = channel::unbounded();
716 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
717 server
718 .on_notification::<notification::ShowMessage, _>(move |params| {
719 message_tx.try_send(params).unwrap()
720 })
721 .detach();
722 server
723 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
724 diagnostics_tx.try_send(params).unwrap()
725 })
726 .detach();
727
728 server
729 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
730 text_document: TextDocumentItem::new(
731 Url::from_str("file://a/b").unwrap(),
732 "rust".to_string(),
733 0,
734 "".to_string(),
735 ),
736 })
737 .await
738 .unwrap();
739 assert_eq!(
740 fake.receive_notification::<notification::DidOpenTextDocument>()
741 .await
742 .text_document
743 .uri
744 .as_str(),
745 "file://a/b"
746 );
747
748 fake.notify::<notification::ShowMessage>(ShowMessageParams {
749 typ: MessageType::ERROR,
750 message: "ok".to_string(),
751 })
752 .await;
753 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
754 uri: Url::from_str("file://b/c").unwrap(),
755 version: Some(5),
756 diagnostics: vec![],
757 })
758 .await;
759 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
760 assert_eq!(
761 diagnostics_rx.recv().await.unwrap().uri.as_str(),
762 "file://b/c"
763 );
764
765 drop(server);
766 let (shutdown_request, _) = fake.receive_request::<request::Shutdown>().await;
767 fake.respond(shutdown_request, ()).await;
768 fake.receive_notification::<notification::Exit>().await;
769 }
770
771 impl LanguageServer {
772 async fn next_idle_notification(self: &Arc<Self>) {
773 let (tx, rx) = channel::unbounded();
774 let _subscription =
775 self.on_notification::<ServerStatusNotification, _>(move |params| {
776 if params.quiescent {
777 tx.try_send(()).unwrap();
778 }
779 });
780 let _ = rx.recv().await;
781 }
782 }
783
784 pub enum ServerStatusNotification {}
785
786 impl notification::Notification for ServerStatusNotification {
787 type Params = ServerStatusParams;
788 const METHOD: &'static str = "experimental/serverStatus";
789 }
790
791 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
792 pub struct ServerStatusParams {
793 pub quiescent: bool,
794 }
795}