1use anyhow::{anyhow, Context, Result};
2use futures::{io::BufWriter, AsyncRead, AsyncWrite};
3use gpui::{executor, Task};
4use parking_lot::{Mutex, RwLock};
5use postage::{barrier, oneshot, prelude::Stream, sink::Sink, watch};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, value::RawValue, Value};
8use smol::{
9 channel,
10 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
11 process::Command,
12};
13use std::{
14 collections::HashMap,
15 future::Future,
16 io::Write,
17 str::FromStr,
18 sync::{
19 atomic::{AtomicUsize, Ordering::SeqCst},
20 Arc,
21 },
22};
23use std::{path::Path, process::Stdio};
24use util::TryFutureExt;
25
26pub use lsp_types::*;
27
28const JSON_RPC_VERSION: &'static str = "2.0";
29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
30
31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
33
34pub struct LanguageServer {
35 next_id: AtomicUsize,
36 outbound_tx: channel::Sender<Vec<u8>>,
37 capabilities: watch::Receiver<Option<ServerCapabilities>>,
38 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
39 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
40 executor: Arc<executor::Background>,
41 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
42 initialized: barrier::Receiver,
43 output_done_rx: Mutex<Option<barrier::Receiver>>,
44}
45
46pub struct Subscription {
47 method: &'static str,
48 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
49}
50
51#[derive(Serialize, Deserialize)]
52struct Request<'a, T> {
53 jsonrpc: &'a str,
54 id: usize,
55 method: &'a str,
56 params: T,
57}
58
59#[cfg(any(test, feature = "test-support"))]
60#[derive(Deserialize)]
61struct AnyRequest<'a> {
62 id: usize,
63 #[serde(borrow)]
64 jsonrpc: &'a str,
65 #[serde(borrow)]
66 method: &'a str,
67 #[serde(borrow)]
68 params: &'a RawValue,
69}
70
71#[derive(Serialize, Deserialize)]
72struct AnyResponse<'a> {
73 id: usize,
74 #[serde(default)]
75 error: Option<Error>,
76 #[serde(borrow)]
77 result: Option<&'a RawValue>,
78}
79
80#[derive(Serialize, Deserialize)]
81struct Notification<'a, T> {
82 #[serde(borrow)]
83 jsonrpc: &'a str,
84 #[serde(borrow)]
85 method: &'a str,
86 params: T,
87}
88
89#[derive(Deserialize)]
90struct AnyNotification<'a> {
91 #[serde(borrow)]
92 method: &'a str,
93 #[serde(borrow)]
94 params: &'a RawValue,
95}
96
97#[derive(Debug, Serialize, Deserialize)]
98struct Error {
99 message: String,
100}
101
102impl LanguageServer {
103 pub fn new(
104 binary_path: &Path,
105 root_path: &Path,
106 background: Arc<executor::Background>,
107 ) -> Result<Arc<Self>> {
108 let mut server = Command::new(binary_path)
109 .stdin(Stdio::piped())
110 .stdout(Stdio::piped())
111 .stderr(Stdio::inherit())
112 .spawn()?;
113 let stdin = server.stdin.take().unwrap();
114 let stdout = server.stdout.take().unwrap();
115 Self::new_internal(stdin, stdout, root_path, background)
116 }
117
118 fn new_internal<Stdin, Stdout>(
119 stdin: Stdin,
120 stdout: Stdout,
121 root_path: &Path,
122 executor: Arc<executor::Background>,
123 ) -> Result<Arc<Self>>
124 where
125 Stdin: AsyncWrite + Unpin + Send + 'static,
126 Stdout: AsyncRead + Unpin + Send + 'static,
127 {
128 let mut stdin = BufWriter::new(stdin);
129 let mut stdout = BufReader::new(stdout);
130 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
131 let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
132 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
133 let input_task = executor.spawn(
134 {
135 let notification_handlers = notification_handlers.clone();
136 let response_handlers = response_handlers.clone();
137 async move {
138 let mut buffer = Vec::new();
139 loop {
140 buffer.clear();
141 stdout.read_until(b'\n', &mut buffer).await?;
142 stdout.read_until(b'\n', &mut buffer).await?;
143 let message_len: usize = std::str::from_utf8(&buffer)?
144 .strip_prefix(CONTENT_LEN_HEADER)
145 .ok_or_else(|| anyhow!("invalid header"))?
146 .trim_end()
147 .parse()?;
148
149 buffer.resize(message_len, 0);
150 stdout.read_exact(&mut buffer).await?;
151
152 if let Ok(AnyNotification { method, params }) =
153 serde_json::from_slice(&buffer)
154 {
155 if let Some(handler) = notification_handlers.write().get_mut(method) {
156 handler(params.get());
157 } else {
158 log::info!(
159 "unhandled notification {}:\n{}",
160 method,
161 serde_json::to_string_pretty(
162 &Value::from_str(params.get()).unwrap()
163 )
164 .unwrap()
165 );
166 }
167 } else if let Ok(AnyResponse { id, error, result }) =
168 serde_json::from_slice(&buffer)
169 {
170 if let Some(handler) = response_handlers.lock().remove(&id) {
171 if let Some(error) = error {
172 handler(Err(error));
173 } else if let Some(result) = result {
174 handler(Ok(result.get()));
175 } else {
176 handler(Ok("null"));
177 }
178 }
179 } else {
180 return Err(anyhow!(
181 "failed to deserialize message:\n{}",
182 std::str::from_utf8(&buffer)?
183 ));
184 }
185 }
186 }
187 }
188 .log_err(),
189 );
190 let (output_done_tx, output_done_rx) = barrier::channel();
191 let output_task = executor.spawn(
192 async move {
193 let mut content_len_buffer = Vec::new();
194 while let Ok(message) = outbound_rx.recv().await {
195 content_len_buffer.clear();
196 write!(content_len_buffer, "{}", message.len()).unwrap();
197 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
198 stdin.write_all(&content_len_buffer).await?;
199 stdin.write_all("\r\n\r\n".as_bytes()).await?;
200 stdin.write_all(&message).await?;
201 stdin.flush().await?;
202 }
203 drop(output_done_tx);
204 Ok(())
205 }
206 .log_err(),
207 );
208
209 let (initialized_tx, initialized_rx) = barrier::channel();
210 let (mut capabilities_tx, capabilities_rx) = watch::channel();
211 let this = Arc::new(Self {
212 notification_handlers,
213 response_handlers,
214 capabilities: capabilities_rx,
215 next_id: Default::default(),
216 outbound_tx,
217 executor: executor.clone(),
218 io_tasks: Mutex::new(Some((input_task, output_task))),
219 initialized: initialized_rx,
220 output_done_rx: Mutex::new(Some(output_done_rx)),
221 });
222
223 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
224 executor
225 .spawn({
226 let this = this.clone();
227 async move {
228 if let Some(capabilities) = this.init(root_uri).log_err().await {
229 *capabilities_tx.borrow_mut() = Some(capabilities);
230 }
231
232 drop(initialized_tx);
233 }
234 })
235 .detach();
236
237 Ok(this)
238 }
239
240 async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
241 #[allow(deprecated)]
242 let params = InitializeParams {
243 process_id: Default::default(),
244 root_path: Default::default(),
245 root_uri: Some(root_uri),
246 initialization_options: Default::default(),
247 capabilities: ClientCapabilities {
248 text_document: Some(TextDocumentClientCapabilities {
249 definition: Some(GotoCapability {
250 link_support: Some(true),
251 ..Default::default()
252 }),
253 code_action: Some(CodeActionClientCapabilities {
254 code_action_literal_support: Some(CodeActionLiteralSupport {
255 code_action_kind: CodeActionKindLiteralSupport {
256 value_set: vec![
257 CodeActionKind::REFACTOR.as_str().into(),
258 CodeActionKind::QUICKFIX.as_str().into(),
259 ],
260 },
261 }),
262 data_support: Some(true),
263 resolve_support: Some(CodeActionCapabilityResolveSupport {
264 properties: vec!["edit".to_string()],
265 }),
266 ..Default::default()
267 }),
268 completion: Some(CompletionClientCapabilities {
269 completion_item: Some(CompletionItemCapability {
270 snippet_support: Some(true),
271 resolve_support: Some(CompletionItemCapabilityResolveSupport {
272 properties: vec!["additionalTextEdits".to_string()],
273 }),
274 ..Default::default()
275 }),
276 ..Default::default()
277 }),
278 ..Default::default()
279 }),
280 experimental: Some(json!({
281 "serverStatusNotification": true,
282 })),
283 window: Some(WindowClientCapabilities {
284 work_done_progress: Some(true),
285 ..Default::default()
286 }),
287 ..Default::default()
288 },
289 trace: Default::default(),
290 workspace_folders: Default::default(),
291 client_info: Default::default(),
292 locale: Default::default(),
293 };
294
295 let this = self.clone();
296 let request = Self::request_internal::<request::Initialize>(
297 &this.next_id,
298 &this.response_handlers,
299 &this.outbound_tx,
300 params,
301 );
302 let response = request.await?;
303 Self::notify_internal::<notification::Initialized>(
304 &this.outbound_tx,
305 InitializedParams {},
306 )?;
307 Ok(response.capabilities)
308 }
309
310 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
311 if let Some(tasks) = self.io_tasks.lock().take() {
312 let response_handlers = self.response_handlers.clone();
313 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
314 let outbound_tx = self.outbound_tx.clone();
315 let mut output_done = self.output_done_rx.lock().take().unwrap();
316 let shutdown_request = Self::request_internal::<request::Shutdown>(
317 &next_id,
318 &response_handlers,
319 &outbound_tx,
320 (),
321 );
322 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
323 outbound_tx.close();
324 Some(
325 async move {
326 shutdown_request.await?;
327 exit?;
328 output_done.recv().await;
329 drop(tasks);
330 Ok(())
331 }
332 .log_err(),
333 )
334 } else {
335 None
336 }
337 }
338
339 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
340 where
341 T: notification::Notification,
342 F: 'static + Send + Sync + FnMut(T::Params),
343 {
344 let prev_handler = self.notification_handlers.write().insert(
345 T::METHOD,
346 Box::new(
347 move |notification| match serde_json::from_str(notification) {
348 Ok(notification) => f(notification),
349 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
350 },
351 ),
352 );
353
354 assert!(
355 prev_handler.is_none(),
356 "registered multiple handlers for the same notification"
357 );
358
359 Subscription {
360 method: T::METHOD,
361 notification_handlers: self.notification_handlers.clone(),
362 }
363 }
364
365 pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
366 self.capabilities.clone()
367 }
368
369 pub fn request<T: request::Request>(
370 self: &Arc<Self>,
371 params: T::Params,
372 ) -> impl Future<Output = Result<T::Result>>
373 where
374 T::Result: 'static + Send,
375 {
376 let this = self.clone();
377 async move {
378 this.initialized.clone().recv().await;
379 Self::request_internal::<T>(
380 &this.next_id,
381 &this.response_handlers,
382 &this.outbound_tx,
383 params,
384 )
385 .await
386 }
387 }
388
389 fn request_internal<T: request::Request>(
390 next_id: &AtomicUsize,
391 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
392 outbound_tx: &channel::Sender<Vec<u8>>,
393 params: T::Params,
394 ) -> impl 'static + Future<Output = Result<T::Result>>
395 where
396 T::Result: 'static + Send,
397 {
398 let id = next_id.fetch_add(1, SeqCst);
399 let message = serde_json::to_vec(&Request {
400 jsonrpc: JSON_RPC_VERSION,
401 id,
402 method: T::METHOD,
403 params,
404 })
405 .unwrap();
406 let mut response_handlers = response_handlers.lock();
407 let (mut tx, mut rx) = oneshot::channel();
408 response_handlers.insert(
409 id,
410 Box::new(move |result| {
411 let response = match result {
412 Ok(response) => {
413 serde_json::from_str(response).context("failed to deserialize response")
414 }
415 Err(error) => Err(anyhow!("{}", error.message)),
416 };
417 let _ = tx.try_send(response);
418 }),
419 );
420
421 let send = outbound_tx
422 .try_send(message)
423 .context("failed to write to language server's stdin");
424 async move {
425 send?;
426 rx.recv().await.unwrap()
427 }
428 }
429
430 pub fn notify<T: notification::Notification>(
431 self: &Arc<Self>,
432 params: T::Params,
433 ) -> impl Future<Output = Result<()>> {
434 let this = self.clone();
435 async move {
436 this.initialized.clone().recv().await;
437 Self::notify_internal::<T>(&this.outbound_tx, params)?;
438 Ok(())
439 }
440 }
441
442 fn notify_internal<T: notification::Notification>(
443 outbound_tx: &channel::Sender<Vec<u8>>,
444 params: T::Params,
445 ) -> Result<()> {
446 let message = serde_json::to_vec(&Notification {
447 jsonrpc: JSON_RPC_VERSION,
448 method: T::METHOD,
449 params,
450 })
451 .unwrap();
452 outbound_tx.try_send(message)?;
453 Ok(())
454 }
455}
456
457impl Drop for LanguageServer {
458 fn drop(&mut self) {
459 if let Some(shutdown) = self.shutdown() {
460 self.executor.spawn(shutdown).detach();
461 }
462 }
463}
464
465impl Subscription {
466 pub fn detach(mut self) {
467 self.method = "";
468 }
469}
470
471impl Drop for Subscription {
472 fn drop(&mut self) {
473 self.notification_handlers.write().remove(self.method);
474 }
475}
476
477#[cfg(any(test, feature = "test-support"))]
478pub struct FakeLanguageServer {
479 handlers: Arc<
480 Mutex<
481 HashMap<
482 &'static str,
483 Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
484 >,
485 >,
486 >,
487 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
488 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
489}
490
491#[cfg(any(test, feature = "test-support"))]
492impl LanguageServer {
493 pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
494 Self::fake_with_capabilities(Default::default(), cx)
495 }
496
497 pub fn fake_with_capabilities(
498 capabilities: ServerCapabilities,
499 cx: &mut gpui::MutableAppContext,
500 ) -> (Arc<Self>, FakeLanguageServer) {
501 let (stdin_writer, stdin_reader) = async_pipe::pipe();
502 let (stdout_writer, stdout_reader) = async_pipe::pipe();
503
504 let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
505 fake.handle_request::<request::Initialize, _>({
506 let capabilities = capabilities.clone();
507 move |_, _| InitializeResult {
508 capabilities: capabilities.clone(),
509 ..Default::default()
510 }
511 });
512
513 let server = Self::new_internal(
514 stdin_writer,
515 stdout_reader,
516 Path::new("/"),
517 cx.background().clone(),
518 )
519 .unwrap();
520
521 (server, fake)
522 }
523}
524
525#[cfg(any(test, feature = "test-support"))]
526impl FakeLanguageServer {
527 fn new(
528 stdin: async_pipe::PipeReader,
529 stdout: async_pipe::PipeWriter,
530 cx: &mut gpui::MutableAppContext,
531 ) -> Self {
532 use futures::StreamExt as _;
533
534 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
535 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
536 let this = Self {
537 outgoing_tx: outgoing_tx.clone(),
538 incoming_rx,
539 handlers: Default::default(),
540 };
541
542 // Receive incoming messages
543 let handlers = this.handlers.clone();
544 cx.spawn(|cx| async move {
545 let mut buffer = Vec::new();
546 let mut stdin = smol::io::BufReader::new(stdin);
547 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
548 cx.background().simulate_random_delay().await;
549 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
550 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
551
552 if let Some(handler) = handlers.lock().get_mut(request.method) {
553 let response =
554 handler(request.id, request.params.get().as_bytes(), cx.clone());
555 log::debug!("handled lsp request. method:{}", request.method);
556 outgoing_tx.unbounded_send(response)?;
557 } else {
558 log::debug!("unhandled lsp request. method:{}", request.method);
559 outgoing_tx.unbounded_send(
560 serde_json::to_vec(&AnyResponse {
561 id: request.id,
562 error: Some(Error {
563 message: "no handler".to_string(),
564 }),
565 result: None,
566 })
567 .unwrap(),
568 )?;
569 }
570 } else {
571 incoming_tx.unbounded_send(buffer.clone())?;
572 }
573 }
574 Ok::<_, anyhow::Error>(())
575 })
576 .detach();
577
578 // Send outgoing messages
579 cx.background()
580 .spawn(async move {
581 let mut stdout = smol::io::BufWriter::new(stdout);
582 while let Some(notification) = outgoing_rx.next().await {
583 Self::send(&mut stdout, ¬ification).await;
584 }
585 })
586 .detach();
587
588 this
589 }
590
591 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
592 let message = serde_json::to_vec(&Notification {
593 jsonrpc: JSON_RPC_VERSION,
594 method: T::METHOD,
595 params,
596 })
597 .unwrap();
598 self.outgoing_tx.unbounded_send(message).unwrap();
599 }
600
601 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
602 use futures::StreamExt as _;
603
604 loop {
605 let bytes = self.incoming_rx.next().await.unwrap();
606 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
607 assert_eq!(notification.method, T::METHOD);
608 return notification.params;
609 } else {
610 log::info!(
611 "skipping message in fake language server {:?}",
612 std::str::from_utf8(&bytes)
613 );
614 }
615 }
616 }
617
618 pub fn handle_request<T, F>(
619 &mut self,
620 mut handler: F,
621 ) -> futures::channel::mpsc::UnboundedReceiver<()>
622 where
623 T: 'static + request::Request,
624 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
625 {
626 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
627 self.handlers.lock().insert(
628 T::METHOD,
629 Box::new(move |id, params, cx| {
630 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
631 let result = serde_json::to_string(&result).unwrap();
632 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
633 let response = AnyResponse {
634 id,
635 error: None,
636 result: Some(result),
637 };
638 responded_tx.unbounded_send(()).ok();
639 serde_json::to_vec(&response).unwrap()
640 }),
641 );
642 responded_rx
643 }
644
645 pub fn remove_request_handler<T>(&mut self)
646 where
647 T: 'static + request::Request,
648 {
649 self.handlers.lock().remove(T::METHOD);
650 }
651
652 pub async fn start_progress(&mut self, token: impl Into<String>) {
653 self.notify::<notification::Progress>(ProgressParams {
654 token: NumberOrString::String(token.into()),
655 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
656 })
657 .await;
658 }
659
660 pub async fn end_progress(&mut self, token: impl Into<String>) {
661 self.notify::<notification::Progress>(ProgressParams {
662 token: NumberOrString::String(token.into()),
663 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
664 })
665 .await;
666 }
667
668 async fn send(stdout: &mut smol::io::BufWriter<async_pipe::PipeWriter>, message: &[u8]) {
669 stdout
670 .write_all(CONTENT_LEN_HEADER.as_bytes())
671 .await
672 .unwrap();
673 stdout
674 .write_all((format!("{}", message.len())).as_bytes())
675 .await
676 .unwrap();
677 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
678 stdout.write_all(&message).await.unwrap();
679 stdout.flush().await.unwrap();
680 }
681
682 async fn receive(
683 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
684 buffer: &mut Vec<u8>,
685 ) -> Result<()> {
686 buffer.clear();
687 stdin.read_until(b'\n', buffer).await?;
688 stdin.read_until(b'\n', buffer).await?;
689 let message_len: usize = std::str::from_utf8(buffer)
690 .unwrap()
691 .strip_prefix(CONTENT_LEN_HEADER)
692 .unwrap()
693 .trim_end()
694 .parse()
695 .unwrap();
696 buffer.resize(message_len, 0);
697 stdin.read_exact(buffer).await?;
698 Ok(())
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use gpui::TestAppContext;
706
707 #[ctor::ctor]
708 fn init_logger() {
709 if std::env::var("RUST_LOG").is_ok() {
710 env_logger::init();
711 }
712 }
713
714 #[gpui::test]
715 async fn test_fake(mut cx: TestAppContext) {
716 let (server, mut fake) = cx.update(LanguageServer::fake);
717
718 let (message_tx, message_rx) = channel::unbounded();
719 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
720 server
721 .on_notification::<notification::ShowMessage, _>(move |params| {
722 message_tx.try_send(params).unwrap()
723 })
724 .detach();
725 server
726 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
727 diagnostics_tx.try_send(params).unwrap()
728 })
729 .detach();
730
731 server
732 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
733 text_document: TextDocumentItem::new(
734 Url::from_str("file://a/b").unwrap(),
735 "rust".to_string(),
736 0,
737 "".to_string(),
738 ),
739 })
740 .await
741 .unwrap();
742 assert_eq!(
743 fake.receive_notification::<notification::DidOpenTextDocument>()
744 .await
745 .text_document
746 .uri
747 .as_str(),
748 "file://a/b"
749 );
750
751 fake.notify::<notification::ShowMessage>(ShowMessageParams {
752 typ: MessageType::ERROR,
753 message: "ok".to_string(),
754 })
755 .await;
756 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
757 uri: Url::from_str("file://b/c").unwrap(),
758 version: Some(5),
759 diagnostics: vec![],
760 })
761 .await;
762 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
763 assert_eq!(
764 diagnostics_rx.recv().await.unwrap().uri.as_str(),
765 "file://b/c"
766 );
767
768 fake.handle_request::<request::Shutdown, _>(|_, _| ());
769
770 drop(server);
771 fake.receive_notification::<notification::Exit>().await;
772 }
773
774 pub enum ServerStatusNotification {}
775
776 impl notification::Notification for ServerStatusNotification {
777 type Params = ServerStatusParams;
778 const METHOD: &'static str = "experimental/serverStatus";
779 }
780
781 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
782 pub struct ServerStatusParams {
783 pub quiescent: bool,
784 }
785}