1use anyhow::{anyhow, Context, Result};
2use collections::HashMap;
3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
4use gpui::{executor, Task};
5use parking_lot::{Mutex, RwLock};
6use postage::{barrier, prelude::Stream, watch};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, value::RawValue, Value};
9use smol::{
10 channel,
11 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use std::{
15 future::Future,
16 io::Write,
17 str::FromStr,
18 sync::{
19 atomic::{AtomicUsize, Ordering::SeqCst},
20 Arc,
21 },
22};
23use std::{path::Path, process::Stdio};
24use util::TryFutureExt;
25
26pub use lsp_types::*;
27
28const JSON_RPC_VERSION: &'static str = "2.0";
29const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
30
31type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
32type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
33
34pub struct LanguageServer {
35 next_id: AtomicUsize,
36 outbound_tx: channel::Sender<Vec<u8>>,
37 capabilities: watch::Receiver<Option<ServerCapabilities>>,
38 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
39 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
40 executor: Arc<executor::Background>,
41 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
42 initialized: barrier::Receiver,
43 output_done_rx: Mutex<Option<barrier::Receiver>>,
44}
45
46pub struct Subscription {
47 method: &'static str,
48 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
49}
50
51#[derive(Serialize, Deserialize)]
52struct Request<'a, T> {
53 jsonrpc: &'a str,
54 id: usize,
55 method: &'a str,
56 params: T,
57}
58
59#[cfg(any(test, feature = "test-support"))]
60#[derive(Deserialize)]
61struct AnyRequest<'a> {
62 id: usize,
63 #[serde(borrow)]
64 jsonrpc: &'a str,
65 #[serde(borrow)]
66 method: &'a str,
67 #[serde(borrow)]
68 params: &'a RawValue,
69}
70
71#[derive(Serialize, Deserialize)]
72struct AnyResponse<'a> {
73 id: usize,
74 #[serde(default)]
75 error: Option<Error>,
76 #[serde(borrow)]
77 result: Option<&'a RawValue>,
78}
79
80#[derive(Serialize, Deserialize)]
81struct Notification<'a, T> {
82 #[serde(borrow)]
83 jsonrpc: &'a str,
84 #[serde(borrow)]
85 method: &'a str,
86 params: T,
87}
88
89#[derive(Deserialize)]
90struct AnyNotification<'a> {
91 #[serde(borrow)]
92 method: &'a str,
93 #[serde(borrow)]
94 params: &'a RawValue,
95}
96
97#[derive(Debug, Serialize, Deserialize)]
98struct Error {
99 message: String,
100}
101
102impl LanguageServer {
103 pub fn new(
104 binary_path: &Path,
105 args: &[&str],
106 options: Option<Value>,
107 root_path: &Path,
108 background: Arc<executor::Background>,
109 ) -> Result<Arc<Self>> {
110 let mut server = Command::new(binary_path)
111 .current_dir(root_path)
112 .args(args)
113 .stdin(Stdio::piped())
114 .stdout(Stdio::piped())
115 .stderr(Stdio::inherit())
116 .spawn()?;
117 let stdin = server.stdin.take().unwrap();
118 let stdout = server.stdout.take().unwrap();
119 Self::new_internal(stdin, stdout, root_path, options, background)
120 }
121
122 fn new_internal<Stdin, Stdout>(
123 stdin: Stdin,
124 stdout: Stdout,
125 root_path: &Path,
126 options: Option<Value>,
127 executor: Arc<executor::Background>,
128 ) -> Result<Arc<Self>>
129 where
130 Stdin: AsyncWrite + Unpin + Send + 'static,
131 Stdout: AsyncRead + Unpin + Send + 'static,
132 {
133 let mut stdin = BufWriter::new(stdin);
134 let mut stdout = BufReader::new(stdout);
135 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
136 let notification_handlers =
137 Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
138 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
139 let input_task = executor.spawn(
140 {
141 let notification_handlers = notification_handlers.clone();
142 let response_handlers = response_handlers.clone();
143 async move {
144 let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
145 let mut buffer = Vec::new();
146 loop {
147 buffer.clear();
148 stdout.read_until(b'\n', &mut buffer).await?;
149 stdout.read_until(b'\n', &mut buffer).await?;
150 let message_len: usize = std::str::from_utf8(&buffer)?
151 .strip_prefix(CONTENT_LEN_HEADER)
152 .ok_or_else(|| anyhow!("invalid header"))?
153 .trim_end()
154 .parse()?;
155
156 buffer.resize(message_len, 0);
157 stdout.read_exact(&mut buffer).await?;
158
159 if let Ok(AnyNotification { method, params }) =
160 serde_json::from_slice(&buffer)
161 {
162 if let Some(handler) = notification_handlers.write().get_mut(method) {
163 handler(params.get());
164 } else {
165 log::info!(
166 "unhandled notification {}:\n{}",
167 method,
168 serde_json::to_string_pretty(
169 &Value::from_str(params.get()).unwrap()
170 )
171 .unwrap()
172 );
173 }
174 } else if let Ok(AnyResponse { id, error, result }) =
175 serde_json::from_slice(&buffer)
176 {
177 if let Some(handler) = response_handlers.lock().remove(&id) {
178 if let Some(error) = error {
179 handler(Err(error));
180 } else if let Some(result) = result {
181 handler(Ok(result.get()));
182 } else {
183 handler(Ok("null"));
184 }
185 }
186 } else {
187 return Err(anyhow!(
188 "failed to deserialize message:\n{}",
189 std::str::from_utf8(&buffer)?
190 ));
191 }
192 }
193 }
194 }
195 .log_err(),
196 );
197 let (output_done_tx, output_done_rx) = barrier::channel();
198 let output_task = executor.spawn({
199 let response_handlers = response_handlers.clone();
200 async move {
201 let _clear_response_handlers = ClearResponseHandlers(response_handlers);
202 let mut content_len_buffer = Vec::new();
203 while let Ok(message) = outbound_rx.recv().await {
204 content_len_buffer.clear();
205 write!(content_len_buffer, "{}", message.len()).unwrap();
206 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
207 stdin.write_all(&content_len_buffer).await?;
208 stdin.write_all("\r\n\r\n".as_bytes()).await?;
209 stdin.write_all(&message).await?;
210 stdin.flush().await?;
211 }
212 drop(output_done_tx);
213 Ok(())
214 }
215 .log_err()
216 });
217
218 let (initialized_tx, initialized_rx) = barrier::channel();
219 let (mut capabilities_tx, capabilities_rx) = watch::channel();
220 let this = Arc::new(Self {
221 notification_handlers,
222 response_handlers,
223 capabilities: capabilities_rx,
224 next_id: Default::default(),
225 outbound_tx,
226 executor: executor.clone(),
227 io_tasks: Mutex::new(Some((input_task, output_task))),
228 initialized: initialized_rx,
229 output_done_rx: Mutex::new(Some(output_done_rx)),
230 });
231
232 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
233 executor
234 .spawn({
235 let this = this.clone();
236 async move {
237 if let Some(capabilities) = this.init(root_uri, options).log_err().await {
238 *capabilities_tx.borrow_mut() = Some(capabilities);
239 }
240
241 drop(initialized_tx);
242 }
243 })
244 .detach();
245
246 Ok(this)
247 }
248
249 async fn init(
250 self: Arc<Self>,
251 root_uri: Url,
252 options: Option<Value>,
253 ) -> Result<ServerCapabilities> {
254 #[allow(deprecated)]
255 let params = InitializeParams {
256 process_id: Default::default(),
257 root_path: Default::default(),
258 root_uri: Some(root_uri),
259 initialization_options: options,
260 capabilities: ClientCapabilities {
261 text_document: Some(TextDocumentClientCapabilities {
262 definition: Some(GotoCapability {
263 link_support: Some(true),
264 ..Default::default()
265 }),
266 code_action: Some(CodeActionClientCapabilities {
267 code_action_literal_support: Some(CodeActionLiteralSupport {
268 code_action_kind: CodeActionKindLiteralSupport {
269 value_set: vec![
270 CodeActionKind::REFACTOR.as_str().into(),
271 CodeActionKind::QUICKFIX.as_str().into(),
272 ],
273 },
274 }),
275 data_support: Some(true),
276 resolve_support: Some(CodeActionCapabilityResolveSupport {
277 properties: vec!["edit".to_string()],
278 }),
279 ..Default::default()
280 }),
281 completion: Some(CompletionClientCapabilities {
282 completion_item: Some(CompletionItemCapability {
283 snippet_support: Some(true),
284 resolve_support: Some(CompletionItemCapabilityResolveSupport {
285 properties: vec!["additionalTextEdits".to_string()],
286 }),
287 ..Default::default()
288 }),
289 ..Default::default()
290 }),
291 ..Default::default()
292 }),
293 experimental: Some(json!({
294 "serverStatusNotification": true,
295 })),
296 window: Some(WindowClientCapabilities {
297 work_done_progress: Some(true),
298 ..Default::default()
299 }),
300 ..Default::default()
301 },
302 trace: Default::default(),
303 workspace_folders: Default::default(),
304 client_info: Default::default(),
305 locale: Default::default(),
306 };
307
308 let this = self.clone();
309 let request = Self::request_internal::<request::Initialize>(
310 &this.next_id,
311 &this.response_handlers,
312 &this.outbound_tx,
313 params,
314 );
315 let response = request.await?;
316 Self::notify_internal::<notification::Initialized>(
317 &this.outbound_tx,
318 InitializedParams {},
319 )?;
320 Ok(response.capabilities)
321 }
322
323 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
324 if let Some(tasks) = self.io_tasks.lock().take() {
325 let response_handlers = self.response_handlers.clone();
326 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
327 let outbound_tx = self.outbound_tx.clone();
328 let mut output_done = self.output_done_rx.lock().take().unwrap();
329 let shutdown_request = Self::request_internal::<request::Shutdown>(
330 &next_id,
331 &response_handlers,
332 &outbound_tx,
333 (),
334 );
335 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
336 outbound_tx.close();
337 Some(
338 async move {
339 log::debug!("language server shutdown started");
340 shutdown_request.await?;
341 response_handlers.lock().clear();
342 exit?;
343 output_done.recv().await;
344 log::debug!("language server shutdown finished");
345 drop(tasks);
346 Ok(())
347 }
348 .log_err(),
349 )
350 } else {
351 None
352 }
353 }
354
355 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
356 where
357 T: notification::Notification,
358 F: 'static + Send + Sync + FnMut(T::Params),
359 {
360 let prev_handler = self.notification_handlers.write().insert(
361 T::METHOD,
362 Box::new(
363 move |notification| match serde_json::from_str(notification) {
364 Ok(notification) => f(notification),
365 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
366 },
367 ),
368 );
369
370 assert!(
371 prev_handler.is_none(),
372 "registered multiple handlers for the same notification"
373 );
374
375 Subscription {
376 method: T::METHOD,
377 notification_handlers: self.notification_handlers.clone(),
378 }
379 }
380
381 pub fn capabilities(&self) -> impl 'static + Future<Output = Option<ServerCapabilities>> {
382 let mut rx = self.capabilities.clone();
383 async move {
384 loop {
385 let value = rx.recv().await?;
386 if value.is_some() {
387 return value;
388 }
389 }
390 }
391 }
392
393 pub fn request<T: request::Request>(
394 self: &Arc<Self>,
395 params: T::Params,
396 ) -> impl Future<Output = Result<T::Result>>
397 where
398 T::Result: 'static + Send,
399 {
400 let this = self.clone();
401 async move {
402 this.initialized.clone().recv().await;
403 Self::request_internal::<T>(
404 &this.next_id,
405 &this.response_handlers,
406 &this.outbound_tx,
407 params,
408 )
409 .await
410 }
411 }
412
413 fn request_internal<T: request::Request>(
414 next_id: &AtomicUsize,
415 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
416 outbound_tx: &channel::Sender<Vec<u8>>,
417 params: T::Params,
418 ) -> impl 'static + Future<Output = Result<T::Result>>
419 where
420 T::Result: 'static + Send,
421 {
422 let id = next_id.fetch_add(1, SeqCst);
423 let message = serde_json::to_vec(&Request {
424 jsonrpc: JSON_RPC_VERSION,
425 id,
426 method: T::METHOD,
427 params,
428 })
429 .unwrap();
430
431 let send = outbound_tx
432 .try_send(message)
433 .context("failed to write to language server's stdin");
434
435 let (tx, rx) = oneshot::channel();
436 response_handlers.lock().insert(
437 id,
438 Box::new(move |result| {
439 let response = match result {
440 Ok(response) => {
441 serde_json::from_str(response).context("failed to deserialize response")
442 }
443 Err(error) => Err(anyhow!("{}", error.message)),
444 };
445 let _ = tx.send(response);
446 }),
447 );
448
449 async move {
450 send?;
451 rx.await?
452 }
453 }
454
455 pub fn notify<T: notification::Notification>(
456 self: &Arc<Self>,
457 params: T::Params,
458 ) -> impl Future<Output = Result<()>> {
459 let this = self.clone();
460 async move {
461 this.initialized.clone().recv().await;
462 Self::notify_internal::<T>(&this.outbound_tx, params)?;
463 Ok(())
464 }
465 }
466
467 fn notify_internal<T: notification::Notification>(
468 outbound_tx: &channel::Sender<Vec<u8>>,
469 params: T::Params,
470 ) -> Result<()> {
471 let message = serde_json::to_vec(&Notification {
472 jsonrpc: JSON_RPC_VERSION,
473 method: T::METHOD,
474 params,
475 })
476 .unwrap();
477 outbound_tx.try_send(message)?;
478 Ok(())
479 }
480}
481
482impl Drop for LanguageServer {
483 fn drop(&mut self) {
484 if let Some(shutdown) = self.shutdown() {
485 self.executor.spawn(shutdown).detach();
486 }
487 }
488}
489
490impl Subscription {
491 pub fn detach(mut self) {
492 self.method = "";
493 }
494}
495
496impl Drop for Subscription {
497 fn drop(&mut self) {
498 self.notification_handlers.write().remove(self.method);
499 }
500}
501
502#[cfg(any(test, feature = "test-support"))]
503pub struct FakeLanguageServer {
504 handlers: FakeLanguageServerHandlers,
505 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
506 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
507 _input_task: Task<Result<()>>,
508 _output_task: Task<Result<()>>,
509}
510
511#[cfg(any(test, feature = "test-support"))]
512type FakeLanguageServerHandlers = Arc<
513 Mutex<
514 HashMap<
515 &'static str,
516 Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
517 >,
518 >,
519>;
520
521#[cfg(any(test, feature = "test-support"))]
522impl LanguageServer {
523 pub fn full_capabilities() -> ServerCapabilities {
524 ServerCapabilities {
525 document_highlight_provider: Some(OneOf::Left(true)),
526 code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
527 document_formatting_provider: Some(OneOf::Left(true)),
528 document_range_formatting_provider: Some(OneOf::Left(true)),
529 ..Default::default()
530 }
531 }
532
533 pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
534 Self::fake_with_capabilities(Self::full_capabilities(), cx)
535 }
536
537 pub fn fake_with_capabilities(
538 capabilities: ServerCapabilities,
539 cx: &mut gpui::MutableAppContext,
540 ) -> (Arc<Self>, FakeLanguageServer) {
541 let (stdin_writer, stdin_reader) = async_pipe::pipe();
542 let (stdout_writer, stdout_reader) = async_pipe::pipe();
543
544 let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
545 fake.handle_request::<request::Initialize, _>({
546 let capabilities = capabilities.clone();
547 move |_, _| InitializeResult {
548 capabilities: capabilities.clone(),
549 ..Default::default()
550 }
551 });
552
553 let server = Self::new_internal(
554 stdin_writer,
555 stdout_reader,
556 Path::new("/"),
557 None,
558 cx.background().clone(),
559 )
560 .unwrap();
561
562 (server, fake)
563 }
564}
565
566#[cfg(any(test, feature = "test-support"))]
567impl FakeLanguageServer {
568 fn new(
569 stdin: async_pipe::PipeReader,
570 stdout: async_pipe::PipeWriter,
571 cx: &mut gpui::MutableAppContext,
572 ) -> Self {
573 use futures::StreamExt as _;
574
575 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
576 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
577 let handlers = FakeLanguageServerHandlers::default();
578
579 let input_task = cx.spawn(|cx| {
580 let handlers = handlers.clone();
581 let outgoing_tx = outgoing_tx.clone();
582 async move {
583 let mut buffer = Vec::new();
584 let mut stdin = smol::io::BufReader::new(stdin);
585 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
586 cx.background().simulate_random_delay().await;
587 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
588 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
589
590 let response;
591 if let Some(handler) = handlers.lock().get_mut(request.method) {
592 response =
593 handler(request.id, request.params.get().as_bytes(), cx.clone());
594 log::debug!("handled lsp request. method:{}", request.method);
595 } else {
596 response = serde_json::to_vec(&AnyResponse {
597 id: request.id,
598 error: Some(Error {
599 message: "no handler".to_string(),
600 }),
601 result: None,
602 })
603 .unwrap();
604 log::debug!("unhandled lsp request. method:{}", request.method);
605 }
606 outgoing_tx.unbounded_send(response)?;
607 } else {
608 incoming_tx.unbounded_send(buffer.clone())?;
609 }
610 }
611 Ok::<_, anyhow::Error>(())
612 }
613 });
614
615 let output_task = cx.background().spawn(async move {
616 let mut stdout = smol::io::BufWriter::new(stdout);
617 while let Some(message) = outgoing_rx.next().await {
618 stdout
619 .write_all(CONTENT_LEN_HEADER.as_bytes())
620 .await
621 .unwrap();
622 stdout
623 .write_all((format!("{}", message.len())).as_bytes())
624 .await
625 .unwrap();
626 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
627 stdout.write_all(&message).await.unwrap();
628 stdout.flush().await.unwrap();
629 }
630 Ok(())
631 });
632
633 Self {
634 outgoing_tx,
635 incoming_rx,
636 handlers,
637 _input_task: input_task,
638 _output_task: output_task,
639 }
640 }
641
642 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
643 let message = serde_json::to_vec(&Notification {
644 jsonrpc: JSON_RPC_VERSION,
645 method: T::METHOD,
646 params,
647 })
648 .unwrap();
649 self.outgoing_tx.unbounded_send(message).unwrap();
650 }
651
652 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
653 use futures::StreamExt as _;
654
655 loop {
656 let bytes = self.incoming_rx.next().await.unwrap();
657 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
658 assert_eq!(notification.method, T::METHOD);
659 return notification.params;
660 } else {
661 log::info!(
662 "skipping message in fake language server {:?}",
663 std::str::from_utf8(&bytes)
664 );
665 }
666 }
667 }
668
669 pub fn handle_request<T, F>(
670 &mut self,
671 mut handler: F,
672 ) -> futures::channel::mpsc::UnboundedReceiver<()>
673 where
674 T: 'static + request::Request,
675 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
676 {
677 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
678 self.handlers.lock().insert(
679 T::METHOD,
680 Box::new(move |id, params, cx| {
681 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
682 let result = serde_json::to_string(&result).unwrap();
683 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
684 let response = AnyResponse {
685 id,
686 error: None,
687 result: Some(result),
688 };
689 responded_tx.unbounded_send(()).ok();
690 serde_json::to_vec(&response).unwrap()
691 }),
692 );
693 responded_rx
694 }
695
696 pub fn remove_request_handler<T>(&mut self)
697 where
698 T: 'static + request::Request,
699 {
700 self.handlers.lock().remove(T::METHOD);
701 }
702
703 pub async fn start_progress(&mut self, token: impl Into<String>) {
704 self.notify::<notification::Progress>(ProgressParams {
705 token: NumberOrString::String(token.into()),
706 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
707 })
708 .await;
709 }
710
711 pub async fn end_progress(&mut self, token: impl Into<String>) {
712 self.notify::<notification::Progress>(ProgressParams {
713 token: NumberOrString::String(token.into()),
714 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
715 })
716 .await;
717 }
718
719 async fn receive(
720 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
721 buffer: &mut Vec<u8>,
722 ) -> Result<()> {
723 buffer.clear();
724 stdin.read_until(b'\n', buffer).await?;
725 stdin.read_until(b'\n', buffer).await?;
726 let message_len: usize = std::str::from_utf8(buffer)
727 .unwrap()
728 .strip_prefix(CONTENT_LEN_HEADER)
729 .ok_or_else(|| anyhow!("invalid content length header"))?
730 .trim_end()
731 .parse()
732 .unwrap();
733 buffer.resize(message_len, 0);
734 stdin.read_exact(buffer).await?;
735 Ok(())
736 }
737}
738
739struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
740
741impl Drop for ClearResponseHandlers {
742 fn drop(&mut self) {
743 self.0.lock().clear();
744 }
745}
746
747#[cfg(test)]
748mod tests {
749 use super::*;
750 use gpui::TestAppContext;
751
752 #[ctor::ctor]
753 fn init_logger() {
754 if std::env::var("RUST_LOG").is_ok() {
755 env_logger::init();
756 }
757 }
758
759 #[gpui::test]
760 async fn test_fake(cx: &mut TestAppContext) {
761 let (server, mut fake) = cx.update(LanguageServer::fake);
762
763 let (message_tx, message_rx) = channel::unbounded();
764 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
765 server
766 .on_notification::<notification::ShowMessage, _>(move |params| {
767 message_tx.try_send(params).unwrap()
768 })
769 .detach();
770 server
771 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
772 diagnostics_tx.try_send(params).unwrap()
773 })
774 .detach();
775
776 server
777 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
778 text_document: TextDocumentItem::new(
779 Url::from_str("file://a/b").unwrap(),
780 "rust".to_string(),
781 0,
782 "".to_string(),
783 ),
784 })
785 .await
786 .unwrap();
787 assert_eq!(
788 fake.receive_notification::<notification::DidOpenTextDocument>()
789 .await
790 .text_document
791 .uri
792 .as_str(),
793 "file://a/b"
794 );
795
796 fake.notify::<notification::ShowMessage>(ShowMessageParams {
797 typ: MessageType::ERROR,
798 message: "ok".to_string(),
799 })
800 .await;
801 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
802 uri: Url::from_str("file://b/c").unwrap(),
803 version: Some(5),
804 diagnostics: vec![],
805 })
806 .await;
807 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
808 assert_eq!(
809 diagnostics_rx.recv().await.unwrap().uri.as_str(),
810 "file://b/c"
811 );
812
813 fake.handle_request::<request::Shutdown, _>(|_, _| ());
814
815 drop(server);
816 fake.receive_notification::<notification::Exit>().await;
817 }
818
819 pub enum ServerStatusNotification {}
820
821 impl notification::Notification for ServerStatusNotification {
822 type Params = ServerStatusParams;
823 const METHOD: &'static str = "experimental/serverStatus";
824 }
825
826 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
827 pub struct ServerStatusParams {
828 pub quiescent: bool,
829 }
830}