lsp.rs

  1use anyhow::{anyhow, Context, Result};
  2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
  3use gpui::{executor, Task};
  4use parking_lot::{Mutex, RwLock};
  5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
  6use serde::{Deserialize, Serialize};
  7use serde_json::{json, value::RawValue, Value};
  8use smol::{
  9    channel,
 10    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
 11    process::Command,
 12};
 13use std::{
 14    collections::HashMap,
 15    future::Future,
 16    io::Write,
 17    str::FromStr,
 18    sync::{
 19        atomic::{AtomicUsize, Ordering::SeqCst},
 20        Arc,
 21    },
 22};
 23use std::{path::Path, process::Stdio};
 24use util::TryFutureExt;
 25
 26pub use lsp_types::*;
 27
 28const JSON_RPC_VERSION: &'static str = "2.0";
 29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 30
 31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
 32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 33
 34pub struct LanguageServer {
 35    next_id: AtomicUsize,
 36    outbound_tx: RwLock<Option<channel::Sender<Vec<u8>>>>,
 37    capabilities: watch::Receiver<Option<ServerCapabilities>>,
 38    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 39    response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
 40    executor: Arc<executor::Background>,
 41    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 42    initialized: barrier::Receiver,
 43    output_done_rx: Mutex<Option<barrier::Receiver>>,
 44}
 45
 46pub struct Subscription {
 47    method: &'static str,
 48    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
 49}
 50
 51#[derive(Serialize, Deserialize)]
 52struct Request<'a, T> {
 53    jsonrpc: &'a str,
 54    id: usize,
 55    method: &'a str,
 56    params: T,
 57}
 58
 59#[derive(Serialize, Deserialize)]
 60struct AnyResponse<'a> {
 61    id: usize,
 62    #[serde(default)]
 63    error: Option<Error>,
 64    #[serde(borrow)]
 65    result: Option<&'a RawValue>,
 66}
 67
 68#[derive(Serialize, Deserialize)]
 69struct Notification<'a, T> {
 70    #[serde(borrow)]
 71    jsonrpc: &'a str,
 72    #[serde(borrow)]
 73    method: &'a str,
 74    params: T,
 75}
 76
 77#[derive(Deserialize)]
 78struct AnyNotification<'a> {
 79    #[serde(borrow)]
 80    method: &'a str,
 81    #[serde(borrow)]
 82    params: &'a RawValue,
 83}
 84
 85#[derive(Debug, Serialize, Deserialize)]
 86struct Error {
 87    message: String,
 88}
 89
 90impl LanguageServer {
 91    pub fn new(
 92        binary_path: &Path,
 93        root_path: &Path,
 94        background: Arc<executor::Background>,
 95    ) -> Result<Arc<Self>> {
 96        let mut server = Command::new(binary_path)
 97            .stdin(Stdio::piped())
 98            .stdout(Stdio::piped())
 99            .stderr(Stdio::inherit())
100            .spawn()?;
101        let stdin = server.stdin.take().unwrap();
102        let stdout = server.stdout.take().unwrap();
103        Self::new_internal(stdin, stdout, root_path, background)
104    }
105
106    fn new_internal<Stdin, Stdout>(
107        stdin: Stdin,
108        stdout: Stdout,
109        root_path: &Path,
110        executor: Arc<executor::Background>,
111    ) -> Result<Arc<Self>>
112    where
113        Stdin: AsyncWrite + Unpin + Send + 'static,
114        Stdout: AsyncRead + Unpin + Send + 'static,
115    {
116        let mut stdin = BufWriter::new(stdin);
117        let mut stdout = BufReader::new(stdout);
118        let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
119        let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
120        let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
121        let input_task = executor.spawn(
122            {
123                let notification_handlers = notification_handlers.clone();
124                let response_handlers = response_handlers.clone();
125                async move {
126                    let mut buffer = Vec::new();
127                    loop {
128                        buffer.clear();
129                        stdout.read_until(b'\n', &mut buffer).await?;
130                        stdout.read_until(b'\n', &mut buffer).await?;
131                        let message_len: usize = std::str::from_utf8(&buffer)?
132                            .strip_prefix(CONTENT_LEN_HEADER)
133                            .ok_or_else(|| anyhow!("invalid header"))?
134                            .trim_end()
135                            .parse()?;
136
137                        buffer.resize(message_len, 0);
138                        stdout.read_exact(&mut buffer).await?;
139
140                        if let Ok(AnyNotification { method, params }) =
141                            serde_json::from_slice(&buffer)
142                        {
143                            if let Some(handler) = notification_handlers.write().get_mut(method) {
144                                handler(params.get());
145                            } else {
146                                log::info!(
147                                    "unhandled notification {}:\n{}",
148                                    method,
149                                    serde_json::to_string_pretty(
150                                        &Value::from_str(params.get()).unwrap()
151                                    )
152                                    .unwrap()
153                                );
154                            }
155                        } else if let Ok(AnyResponse { id, error, result }) =
156                            serde_json::from_slice(&buffer)
157                        {
158                            if let Some(handler) = response_handlers.lock().remove(&id) {
159                                if let Some(error) = error {
160                                    handler(Err(error));
161                                } else if let Some(result) = result {
162                                    handler(Ok(result.get()));
163                                } else {
164                                    handler(Ok("null"));
165                                }
166                            }
167                        } else {
168                            return Err(anyhow!(
169                                "failed to deserialize message:\n{}",
170                                std::str::from_utf8(&buffer)?
171                            ));
172                        }
173                    }
174                }
175            }
176            .log_err(),
177        );
178        let (output_done_tx, output_done_rx) = barrier::channel();
179        let output_task = executor.spawn(
180            async move {
181                let mut content_len_buffer = Vec::new();
182                while let Ok(message) = outbound_rx.recv().await {
183                    content_len_buffer.clear();
184                    write!(content_len_buffer, "{}", message.len()).unwrap();
185                    stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
186                    stdin.write_all(&content_len_buffer).await?;
187                    stdin.write_all("\r\n\r\n".as_bytes()).await?;
188                    stdin.write_all(&message).await?;
189                    stdin.flush().await?;
190                }
191                drop(output_done_tx);
192                Ok(())
193            }
194            .log_err(),
195        );
196
197        let (initialized_tx, initialized_rx) = barrier::channel();
198        let (mut capabilities_tx, capabilities_rx) = watch::channel();
199        let this = Arc::new(Self {
200            notification_handlers,
201            response_handlers,
202            capabilities: capabilities_rx,
203            next_id: Default::default(),
204            outbound_tx: RwLock::new(Some(outbound_tx)),
205            executor: executor.clone(),
206            io_tasks: Mutex::new(Some((input_task, output_task))),
207            initialized: initialized_rx,
208            output_done_rx: Mutex::new(Some(output_done_rx)),
209        });
210
211        let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
212        executor
213            .spawn({
214                let this = this.clone();
215                async move {
216                    if let Some(capabilities) = this.init(root_uri).log_err().await {
217                        *capabilities_tx.borrow_mut() = Some(capabilities);
218                    }
219
220                    drop(initialized_tx);
221                }
222            })
223            .detach();
224
225        Ok(this)
226    }
227
228    async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
229        #[allow(deprecated)]
230        let params = InitializeParams {
231            process_id: Default::default(),
232            root_path: Default::default(),
233            root_uri: Some(root_uri),
234            initialization_options: Default::default(),
235            capabilities: ClientCapabilities {
236                text_document: Some(TextDocumentClientCapabilities {
237                    definition: Some(GotoCapability {
238                        link_support: Some(true),
239                        ..Default::default()
240                    }),
241                    completion: Some(CompletionClientCapabilities {
242                        completion_item: Some(CompletionItemCapability {
243                            snippet_support: Some(true),
244                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
245                                properties: vec!["additionalTextEdits".to_string()],
246                            }),
247                            ..Default::default()
248                        }),
249                        ..Default::default()
250                    }),
251                    ..Default::default()
252                }),
253                experimental: Some(json!({
254                    "serverStatusNotification": true,
255                })),
256                window: Some(WindowClientCapabilities {
257                    work_done_progress: Some(true),
258                    ..Default::default()
259                }),
260                ..Default::default()
261            },
262            trace: Default::default(),
263            workspace_folders: Default::default(),
264            client_info: Default::default(),
265            locale: Default::default(),
266        };
267
268        let this = self.clone();
269        let request = Self::request_internal::<request::Initialize>(
270            &this.next_id,
271            &this.response_handlers,
272            this.outbound_tx.read().as_ref(),
273            params,
274        );
275        let response = request.await?;
276        Self::notify_internal::<notification::Initialized>(
277            this.outbound_tx.read().as_ref(),
278            InitializedParams {},
279        )?;
280        Ok(response.capabilities)
281    }
282
283    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Result<()>>> {
284        if let Some(tasks) = self.io_tasks.lock().take() {
285            let response_handlers = self.response_handlers.clone();
286            let outbound_tx = self.outbound_tx.write().take();
287            let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
288            let mut output_done = self.output_done_rx.lock().take().unwrap();
289            Some(async move {
290                Self::request_internal::<request::Shutdown>(
291                    &next_id,
292                    &response_handlers,
293                    outbound_tx.as_ref(),
294                    (),
295                )
296                .await?;
297                Self::notify_internal::<notification::Exit>(outbound_tx.as_ref(), ())?;
298                drop(outbound_tx);
299                output_done.recv().await;
300                drop(tasks);
301                Ok(())
302            })
303        } else {
304            None
305        }
306    }
307
308    pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
309    where
310        T: notification::Notification,
311        F: 'static + Send + Sync + FnMut(T::Params),
312    {
313        let prev_handler = self.notification_handlers.write().insert(
314            T::METHOD,
315            Box::new(
316                move |notification| match serde_json::from_str(notification) {
317                    Ok(notification) => f(notification),
318                    Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
319                },
320            ),
321        );
322
323        assert!(
324            prev_handler.is_none(),
325            "registered multiple handlers for the same notification"
326        );
327
328        Subscription {
329            method: T::METHOD,
330            notification_handlers: self.notification_handlers.clone(),
331        }
332    }
333
334    pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
335        self.capabilities.clone()
336    }
337
338    pub fn request<T: request::Request>(
339        self: &Arc<Self>,
340        params: T::Params,
341    ) -> impl Future<Output = Result<T::Result>>
342    where
343        T::Result: 'static + Send,
344    {
345        let this = self.clone();
346        async move {
347            this.initialized.clone().recv().await;
348            Self::request_internal::<T>(
349                &this.next_id,
350                &this.response_handlers,
351                this.outbound_tx.read().as_ref(),
352                params,
353            )
354            .await
355        }
356    }
357
358    fn request_internal<T: request::Request>(
359        next_id: &AtomicUsize,
360        response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
361        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
362        params: T::Params,
363    ) -> impl 'static + Future<Output = Result<T::Result>>
364    where
365        T::Result: 'static + Send,
366    {
367        let id = next_id.fetch_add(1, SeqCst);
368        let message = serde_json::to_vec(&Request {
369            jsonrpc: JSON_RPC_VERSION,
370            id,
371            method: T::METHOD,
372            params,
373        })
374        .unwrap();
375        let mut response_handlers = response_handlers.lock();
376        let (mut tx, mut rx) = oneshot::channel();
377        response_handlers.insert(
378            id,
379            Box::new(move |result| {
380                let response = match result {
381                    Ok(response) => {
382                        serde_json::from_str(response).context("failed to deserialize response")
383                    }
384                    Err(error) => Err(anyhow!("{}", error.message)),
385                };
386                let _ = tx.try_send(response);
387            }),
388        );
389
390        let send = outbound_tx
391            .as_ref()
392            .ok_or_else(|| {
393                anyhow!("tried to send a request to a language server that has been shut down")
394            })
395            .and_then(|outbound_tx| {
396                outbound_tx.try_send(message)?;
397                Ok(())
398            });
399        async move {
400            send?;
401            rx.recv().await.unwrap()
402        }
403    }
404
405    pub fn notify<T: notification::Notification>(
406        self: &Arc<Self>,
407        params: T::Params,
408    ) -> impl Future<Output = Result<()>> {
409        let this = self.clone();
410        async move {
411            this.initialized.clone().recv().await;
412            Self::notify_internal::<T>(this.outbound_tx.read().as_ref(), params)?;
413            Ok(())
414        }
415    }
416
417    fn notify_internal<T: notification::Notification>(
418        outbound_tx: Option<&channel::Sender<Vec<u8>>>,
419        params: T::Params,
420    ) -> Result<()> {
421        let message = serde_json::to_vec(&Notification {
422            jsonrpc: JSON_RPC_VERSION,
423            method: T::METHOD,
424            params,
425        })
426        .unwrap();
427        let outbound_tx = outbound_tx
428            .as_ref()
429            .ok_or_else(|| anyhow!("tried to notify a language server that has been shut down"))?;
430        outbound_tx.try_send(message)?;
431        Ok(())
432    }
433}
434
435impl Drop for LanguageServer {
436    fn drop(&mut self) {
437        if let Some(shutdown) = self.shutdown() {
438            self.executor.spawn(shutdown).detach();
439        }
440    }
441}
442
443impl Subscription {
444    pub fn detach(mut self) {
445        self.method = "";
446    }
447}
448
449impl Drop for Subscription {
450    fn drop(&mut self) {
451        self.notification_handlers.write().remove(self.method);
452    }
453}
454
455#[cfg(any(test, feature = "test-support"))]
456pub struct FakeLanguageServer {
457    buffer: Vec<u8>,
458    stdin: smol::io::BufReader<async_pipe::PipeReader>,
459    stdout: smol::io::BufWriter<async_pipe::PipeWriter>,
460    pub started: Arc<std::sync::atomic::AtomicBool>,
461}
462
463#[cfg(any(test, feature = "test-support"))]
464pub struct RequestId<T> {
465    id: usize,
466    _type: std::marker::PhantomData<T>,
467}
468
469#[cfg(any(test, feature = "test-support"))]
470impl LanguageServer {
471    pub async fn fake(executor: Arc<executor::Background>) -> (Arc<Self>, FakeLanguageServer) {
472        Self::fake_with_capabilities(Default::default(), executor).await
473    }
474
475    pub async fn fake_with_capabilities(
476        capabilities: ServerCapabilities,
477        executor: Arc<executor::Background>,
478    ) -> (Arc<Self>, FakeLanguageServer) {
479        let stdin = async_pipe::pipe();
480        let stdout = async_pipe::pipe();
481        let mut fake = FakeLanguageServer {
482            stdin: smol::io::BufReader::new(stdin.1),
483            stdout: smol::io::BufWriter::new(stdout.0),
484            buffer: Vec::new(),
485            started: Arc::new(std::sync::atomic::AtomicBool::new(true)),
486        };
487
488        let server = Self::new_internal(stdin.0, stdout.1, Path::new("/"), executor).unwrap();
489
490        let (init_id, _) = fake.receive_request::<request::Initialize>().await;
491        fake.respond(
492            init_id,
493            InitializeResult {
494                capabilities,
495                ..Default::default()
496            },
497        )
498        .await;
499        fake.receive_notification::<notification::Initialized>()
500            .await;
501
502        (server, fake)
503    }
504}
505
506#[cfg(any(test, feature = "test-support"))]
507impl FakeLanguageServer {
508    pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
509        if !self.started.load(std::sync::atomic::Ordering::SeqCst) {
510            panic!("can't simulate an LSP notification before the server has been started");
511        }
512        let message = serde_json::to_vec(&Notification {
513            jsonrpc: JSON_RPC_VERSION,
514            method: T::METHOD,
515            params,
516        })
517        .unwrap();
518        self.send(message).await;
519    }
520
521    pub async fn respond<'a, T: request::Request>(
522        &mut self,
523        request_id: RequestId<T>,
524        result: T::Result,
525    ) {
526        let result = serde_json::to_string(&result).unwrap();
527        let message = serde_json::to_vec(&AnyResponse {
528            id: request_id.id,
529            error: None,
530            result: Some(&RawValue::from_string(result).unwrap()),
531        })
532        .unwrap();
533        self.send(message).await;
534    }
535
536    pub async fn receive_request<T: request::Request>(&mut self) -> (RequestId<T>, T::Params) {
537        loop {
538            self.receive().await;
539            if let Ok(request) = serde_json::from_slice::<Request<T::Params>>(&self.buffer) {
540                assert_eq!(request.method, T::METHOD);
541                assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
542                return (
543                    RequestId {
544                        id: request.id,
545                        _type: std::marker::PhantomData,
546                    },
547                    request.params,
548                );
549            } else {
550                println!(
551                    "skipping message in fake language server {:?}",
552                    std::str::from_utf8(&self.buffer)
553                );
554            }
555        }
556    }
557
558    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
559        self.receive().await;
560        let notification = serde_json::from_slice::<Notification<T::Params>>(&self.buffer).unwrap();
561        assert_eq!(notification.method, T::METHOD);
562        notification.params
563    }
564
565    pub async fn start_progress(&mut self, token: impl Into<String>) {
566        self.notify::<notification::Progress>(ProgressParams {
567            token: NumberOrString::String(token.into()),
568            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
569        })
570        .await;
571    }
572
573    pub async fn end_progress(&mut self, token: impl Into<String>) {
574        self.notify::<notification::Progress>(ProgressParams {
575            token: NumberOrString::String(token.into()),
576            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
577        })
578        .await;
579    }
580
581    async fn send(&mut self, message: Vec<u8>) {
582        self.stdout
583            .write_all(CONTENT_LEN_HEADER.as_bytes())
584            .await
585            .unwrap();
586        self.stdout
587            .write_all((format!("{}", message.len())).as_bytes())
588            .await
589            .unwrap();
590        self.stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
591        self.stdout.write_all(&message).await.unwrap();
592        self.stdout.flush().await.unwrap();
593    }
594
595    async fn receive(&mut self) {
596        self.buffer.clear();
597        self.stdin
598            .read_until(b'\n', &mut self.buffer)
599            .await
600            .unwrap();
601        self.stdin
602            .read_until(b'\n', &mut self.buffer)
603            .await
604            .unwrap();
605        let message_len: usize = std::str::from_utf8(&self.buffer)
606            .unwrap()
607            .strip_prefix(CONTENT_LEN_HEADER)
608            .unwrap()
609            .trim_end()
610            .parse()
611            .unwrap();
612        self.buffer.resize(message_len, 0);
613        self.stdin.read_exact(&mut self.buffer).await.unwrap();
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620    use gpui::TestAppContext;
621    use simplelog::SimpleLogger;
622    use unindent::Unindent;
623    use util::test::temp_tree;
624
625    #[gpui::test]
626    async fn test_rust_analyzer(cx: TestAppContext) {
627        let lib_source = r#"
628            fn fun() {
629                let hello = "world";
630            }
631        "#
632        .unindent();
633        let root_dir = temp_tree(json!({
634            "Cargo.toml": r#"
635                [package]
636                name = "temp"
637                version = "0.1.0"
638                edition = "2018"
639            "#.unindent(),
640            "src": {
641                "lib.rs": &lib_source
642            }
643        }));
644        let lib_file_uri = Url::from_file_path(root_dir.path().join("src/lib.rs")).unwrap();
645
646        let server = cx.read(|cx| {
647            LanguageServer::new(
648                Path::new("rust-analyzer"),
649                root_dir.path(),
650                cx.background().clone(),
651            )
652            .unwrap()
653        });
654        server.next_idle_notification().await;
655
656        server
657            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
658                text_document: TextDocumentItem::new(
659                    lib_file_uri.clone(),
660                    "rust".to_string(),
661                    0,
662                    lib_source,
663                ),
664            })
665            .await
666            .unwrap();
667
668        let hover = server
669            .request::<request::HoverRequest>(HoverParams {
670                text_document_position_params: TextDocumentPositionParams {
671                    text_document: TextDocumentIdentifier::new(lib_file_uri),
672                    position: Position::new(1, 21),
673                },
674                work_done_progress_params: Default::default(),
675            })
676            .await
677            .unwrap()
678            .unwrap();
679        assert_eq!(
680            hover.contents,
681            HoverContents::Markup(MarkupContent {
682                kind: MarkupKind::PlainText,
683                value: "&str".to_string()
684            })
685        );
686    }
687
688    #[gpui::test]
689    async fn test_fake(cx: TestAppContext) {
690        SimpleLogger::init(log::LevelFilter::Info, Default::default()).unwrap();
691
692        let (server, mut fake) = LanguageServer::fake(cx.background()).await;
693
694        let (message_tx, message_rx) = channel::unbounded();
695        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
696        server
697            .on_notification::<notification::ShowMessage, _>(move |params| {
698                message_tx.try_send(params).unwrap()
699            })
700            .detach();
701        server
702            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
703                diagnostics_tx.try_send(params).unwrap()
704            })
705            .detach();
706
707        server
708            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
709                text_document: TextDocumentItem::new(
710                    Url::from_str("file://a/b").unwrap(),
711                    "rust".to_string(),
712                    0,
713                    "".to_string(),
714                ),
715            })
716            .await
717            .unwrap();
718        assert_eq!(
719            fake.receive_notification::<notification::DidOpenTextDocument>()
720                .await
721                .text_document
722                .uri
723                .as_str(),
724            "file://a/b"
725        );
726
727        fake.notify::<notification::ShowMessage>(ShowMessageParams {
728            typ: MessageType::ERROR,
729            message: "ok".to_string(),
730        })
731        .await;
732        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
733            uri: Url::from_str("file://b/c").unwrap(),
734            version: Some(5),
735            diagnostics: vec![],
736        })
737        .await;
738        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
739        assert_eq!(
740            diagnostics_rx.recv().await.unwrap().uri.as_str(),
741            "file://b/c"
742        );
743
744        drop(server);
745        let (shutdown_request, _) = fake.receive_request::<request::Shutdown>().await;
746        fake.respond(shutdown_request, ()).await;
747        fake.receive_notification::<notification::Exit>().await;
748    }
749
750    impl LanguageServer {
751        async fn next_idle_notification(self: &Arc<Self>) {
752            let (tx, rx) = channel::unbounded();
753            let _subscription =
754                self.on_notification::<ServerStatusNotification, _>(move |params| {
755                    if params.quiescent {
756                        tx.try_send(()).unwrap();
757                    }
758                });
759            let _ = rx.recv().await;
760        }
761    }
762
763    pub enum ServerStatusNotification {}
764
765    impl notification::Notification for ServerStatusNotification {
766        type Params = ServerStatusParams;
767        const METHOD: &'static str = "experimental/serverStatus";
768    }
769
770    #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
771    pub struct ServerStatusParams {
772        pub quiescent: bool,
773    }
774}