1use anyhow::{anyhow, Context, Result};
2use futures::channel::oneshot;
3use futures::{io::BufWriter, AsyncRead, AsyncWrite};
4use gpui::{executor, Task};
5use parking_lot::{Mutex, RwLock};
6use postage::{barrier, prelude::Stream, watch};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, value::RawValue, Value};
9use smol::{
10 channel,
11 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use std::{
15 collections::HashMap,
16 future::Future,
17 io::Write,
18 str::FromStr,
19 sync::{
20 atomic::{AtomicUsize, Ordering::SeqCst},
21 Arc,
22 },
23};
24use std::{path::Path, process::Stdio};
25use util::TryFutureExt;
26
27pub use lsp_types::*;
28
29const JSON_RPC_VERSION: &'static str = "2.0";
30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
31
32type NotificationHandler = Box<dyn Send + Sync + FnMut(&str)>;
33type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
34
35pub struct LanguageServer {
36 next_id: AtomicUsize,
37 outbound_tx: channel::Sender<Vec<u8>>,
38 capabilities: watch::Receiver<Option<ServerCapabilities>>,
39 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
40 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
41 executor: Arc<executor::Background>,
42 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
43 initialized: barrier::Receiver,
44 output_done_rx: Mutex<Option<barrier::Receiver>>,
45}
46
47pub struct Subscription {
48 method: &'static str,
49 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
50}
51
52#[derive(Serialize, Deserialize)]
53struct Request<'a, T> {
54 jsonrpc: &'a str,
55 id: usize,
56 method: &'a str,
57 params: T,
58}
59
60#[cfg(any(test, feature = "test-support"))]
61#[derive(Deserialize)]
62struct AnyRequest<'a> {
63 id: usize,
64 #[serde(borrow)]
65 jsonrpc: &'a str,
66 #[serde(borrow)]
67 method: &'a str,
68 #[serde(borrow)]
69 params: &'a RawValue,
70}
71
72#[derive(Serialize, Deserialize)]
73struct AnyResponse<'a> {
74 id: usize,
75 #[serde(default)]
76 error: Option<Error>,
77 #[serde(borrow)]
78 result: Option<&'a RawValue>,
79}
80
81#[derive(Serialize, Deserialize)]
82struct Notification<'a, T> {
83 #[serde(borrow)]
84 jsonrpc: &'a str,
85 #[serde(borrow)]
86 method: &'a str,
87 params: T,
88}
89
90#[derive(Deserialize)]
91struct AnyNotification<'a> {
92 #[serde(borrow)]
93 method: &'a str,
94 #[serde(borrow)]
95 params: &'a RawValue,
96}
97
98#[derive(Debug, Serialize, Deserialize)]
99struct Error {
100 message: String,
101}
102
103impl LanguageServer {
104 pub fn new(
105 binary_path: &Path,
106 root_path: &Path,
107 background: Arc<executor::Background>,
108 ) -> Result<Arc<Self>> {
109 let mut server = Command::new(binary_path)
110 .stdin(Stdio::piped())
111 .stdout(Stdio::piped())
112 .stderr(Stdio::inherit())
113 .spawn()?;
114 let stdin = server.stdin.take().unwrap();
115 let stdout = server.stdout.take().unwrap();
116 Self::new_internal(stdin, stdout, root_path, background)
117 }
118
119 fn new_internal<Stdin, Stdout>(
120 stdin: Stdin,
121 stdout: Stdout,
122 root_path: &Path,
123 executor: Arc<executor::Background>,
124 ) -> Result<Arc<Self>>
125 where
126 Stdin: AsyncWrite + Unpin + Send + 'static,
127 Stdout: AsyncRead + Unpin + Send + 'static,
128 {
129 let mut stdin = BufWriter::new(stdin);
130 let mut stdout = BufReader::new(stdout);
131 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
132 let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new()));
133 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new()));
134 let input_task = executor.spawn(
135 {
136 let notification_handlers = notification_handlers.clone();
137 let response_handlers = response_handlers.clone();
138 async move {
139 let _clear_response_channels = ClearResponseChannels(response_handlers.clone());
140 let mut buffer = Vec::new();
141 loop {
142 buffer.clear();
143 stdout.read_until(b'\n', &mut buffer).await?;
144 stdout.read_until(b'\n', &mut buffer).await?;
145 let message_len: usize = std::str::from_utf8(&buffer)?
146 .strip_prefix(CONTENT_LEN_HEADER)
147 .ok_or_else(|| anyhow!("invalid header"))?
148 .trim_end()
149 .parse()?;
150
151 buffer.resize(message_len, 0);
152 stdout.read_exact(&mut buffer).await?;
153
154 if let Ok(AnyNotification { method, params }) =
155 serde_json::from_slice(&buffer)
156 {
157 if let Some(handler) = notification_handlers.write().get_mut(method) {
158 handler(params.get());
159 } else {
160 log::info!(
161 "unhandled notification {}:\n{}",
162 method,
163 serde_json::to_string_pretty(
164 &Value::from_str(params.get()).unwrap()
165 )
166 .unwrap()
167 );
168 }
169 } else if let Ok(AnyResponse { id, error, result }) =
170 serde_json::from_slice(&buffer)
171 {
172 if let Some(handler) = response_handlers.lock().remove(&id) {
173 if let Some(error) = error {
174 handler(Err(error));
175 } else if let Some(result) = result {
176 handler(Ok(result.get()));
177 } else {
178 handler(Ok("null"));
179 }
180 }
181 } else {
182 return Err(anyhow!(
183 "failed to deserialize message:\n{}",
184 std::str::from_utf8(&buffer)?
185 ));
186 }
187 }
188 }
189 }
190 .log_err(),
191 );
192 let (output_done_tx, output_done_rx) = barrier::channel();
193 let output_task = executor.spawn(
194 async move {
195 let mut content_len_buffer = Vec::new();
196 while let Ok(message) = outbound_rx.recv().await {
197 content_len_buffer.clear();
198 write!(content_len_buffer, "{}", message.len()).unwrap();
199 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
200 stdin.write_all(&content_len_buffer).await?;
201 stdin.write_all("\r\n\r\n".as_bytes()).await?;
202 stdin.write_all(&message).await?;
203 stdin.flush().await?;
204 }
205 drop(output_done_tx);
206 Ok(())
207 }
208 .log_err(),
209 );
210
211 let (initialized_tx, initialized_rx) = barrier::channel();
212 let (mut capabilities_tx, capabilities_rx) = watch::channel();
213 let this = Arc::new(Self {
214 notification_handlers,
215 response_handlers,
216 capabilities: capabilities_rx,
217 next_id: Default::default(),
218 outbound_tx,
219 executor: executor.clone(),
220 io_tasks: Mutex::new(Some((input_task, output_task))),
221 initialized: initialized_rx,
222 output_done_rx: Mutex::new(Some(output_done_rx)),
223 });
224
225 let root_uri = Url::from_file_path(root_path).map_err(|_| anyhow!("invalid root path"))?;
226 executor
227 .spawn({
228 let this = this.clone();
229 async move {
230 if let Some(capabilities) = this.init(root_uri).log_err().await {
231 *capabilities_tx.borrow_mut() = Some(capabilities);
232 }
233
234 drop(initialized_tx);
235 }
236 })
237 .detach();
238
239 Ok(this)
240 }
241
242 async fn init(self: Arc<Self>, root_uri: Url) -> Result<ServerCapabilities> {
243 #[allow(deprecated)]
244 let params = InitializeParams {
245 process_id: Default::default(),
246 root_path: Default::default(),
247 root_uri: Some(root_uri),
248 initialization_options: Default::default(),
249 capabilities: ClientCapabilities {
250 text_document: Some(TextDocumentClientCapabilities {
251 definition: Some(GotoCapability {
252 link_support: Some(true),
253 ..Default::default()
254 }),
255 code_action: Some(CodeActionClientCapabilities {
256 code_action_literal_support: Some(CodeActionLiteralSupport {
257 code_action_kind: CodeActionKindLiteralSupport {
258 value_set: vec![
259 CodeActionKind::REFACTOR.as_str().into(),
260 CodeActionKind::QUICKFIX.as_str().into(),
261 ],
262 },
263 }),
264 data_support: Some(true),
265 resolve_support: Some(CodeActionCapabilityResolveSupport {
266 properties: vec!["edit".to_string()],
267 }),
268 ..Default::default()
269 }),
270 completion: Some(CompletionClientCapabilities {
271 completion_item: Some(CompletionItemCapability {
272 snippet_support: Some(true),
273 resolve_support: Some(CompletionItemCapabilityResolveSupport {
274 properties: vec!["additionalTextEdits".to_string()],
275 }),
276 ..Default::default()
277 }),
278 ..Default::default()
279 }),
280 ..Default::default()
281 }),
282 experimental: Some(json!({
283 "serverStatusNotification": true,
284 })),
285 window: Some(WindowClientCapabilities {
286 work_done_progress: Some(true),
287 ..Default::default()
288 }),
289 ..Default::default()
290 },
291 trace: Default::default(),
292 workspace_folders: Default::default(),
293 client_info: Default::default(),
294 locale: Default::default(),
295 };
296
297 let this = self.clone();
298 let request = Self::request_internal::<request::Initialize>(
299 &this.next_id,
300 &this.response_handlers,
301 &this.outbound_tx,
302 params,
303 );
304 let response = request.await?;
305 Self::notify_internal::<notification::Initialized>(
306 &this.outbound_tx,
307 InitializedParams {},
308 )?;
309 Ok(response.capabilities)
310 }
311
312 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
313 if let Some(tasks) = self.io_tasks.lock().take() {
314 let response_handlers = self.response_handlers.clone();
315 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
316 let outbound_tx = self.outbound_tx.clone();
317 let mut output_done = self.output_done_rx.lock().take().unwrap();
318 let shutdown_request = Self::request_internal::<request::Shutdown>(
319 &next_id,
320 &response_handlers,
321 &outbound_tx,
322 (),
323 );
324 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
325 outbound_tx.close();
326 Some(
327 async move {
328 log::debug!("language server shutdown started");
329 shutdown_request.await?;
330 response_handlers.lock().clear();
331 exit?;
332 output_done.recv().await;
333 log::debug!("language server shutdown finished");
334 drop(tasks);
335 Ok(())
336 }
337 .log_err(),
338 )
339 } else {
340 None
341 }
342 }
343
344 pub fn on_notification<T, F>(&self, mut f: F) -> Subscription
345 where
346 T: notification::Notification,
347 F: 'static + Send + Sync + FnMut(T::Params),
348 {
349 let prev_handler = self.notification_handlers.write().insert(
350 T::METHOD,
351 Box::new(
352 move |notification| match serde_json::from_str(notification) {
353 Ok(notification) => f(notification),
354 Err(err) => log::error!("error parsing notification {}: {}", T::METHOD, err),
355 },
356 ),
357 );
358
359 assert!(
360 prev_handler.is_none(),
361 "registered multiple handlers for the same notification"
362 );
363
364 Subscription {
365 method: T::METHOD,
366 notification_handlers: self.notification_handlers.clone(),
367 }
368 }
369
370 pub fn capabilities(&self) -> watch::Receiver<Option<ServerCapabilities>> {
371 self.capabilities.clone()
372 }
373
374 pub fn request<T: request::Request>(
375 self: &Arc<Self>,
376 params: T::Params,
377 ) -> impl Future<Output = Result<T::Result>>
378 where
379 T::Result: 'static + Send,
380 {
381 let this = self.clone();
382 async move {
383 this.initialized.clone().recv().await;
384 Self::request_internal::<T>(
385 &this.next_id,
386 &this.response_handlers,
387 &this.outbound_tx,
388 params,
389 )
390 .await
391 }
392 }
393
394 fn request_internal<T: request::Request>(
395 next_id: &AtomicUsize,
396 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
397 outbound_tx: &channel::Sender<Vec<u8>>,
398 params: T::Params,
399 ) -> impl 'static + Future<Output = Result<T::Result>>
400 where
401 T::Result: 'static + Send,
402 {
403 let id = next_id.fetch_add(1, SeqCst);
404 let message = serde_json::to_vec(&Request {
405 jsonrpc: JSON_RPC_VERSION,
406 id,
407 method: T::METHOD,
408 params,
409 })
410 .unwrap();
411 let mut response_handlers = response_handlers.lock();
412 let (tx, rx) = oneshot::channel();
413 response_handlers.insert(
414 id,
415 Box::new(move |result| {
416 let response = match result {
417 Ok(response) => {
418 serde_json::from_str(response).context("failed to deserialize response")
419 }
420 Err(error) => Err(anyhow!("{}", error.message)),
421 };
422 let _ = tx.send(response);
423 }),
424 );
425
426 let send = outbound_tx
427 .try_send(message)
428 .context("failed to write to language server's stdin");
429 async move {
430 send?;
431 rx.await?
432 }
433 }
434
435 pub fn notify<T: notification::Notification>(
436 self: &Arc<Self>,
437 params: T::Params,
438 ) -> impl Future<Output = Result<()>> {
439 let this = self.clone();
440 async move {
441 this.initialized.clone().recv().await;
442 Self::notify_internal::<T>(&this.outbound_tx, params)?;
443 Ok(())
444 }
445 }
446
447 fn notify_internal<T: notification::Notification>(
448 outbound_tx: &channel::Sender<Vec<u8>>,
449 params: T::Params,
450 ) -> Result<()> {
451 let message = serde_json::to_vec(&Notification {
452 jsonrpc: JSON_RPC_VERSION,
453 method: T::METHOD,
454 params,
455 })
456 .unwrap();
457 outbound_tx.try_send(message)?;
458 Ok(())
459 }
460}
461
462impl Drop for LanguageServer {
463 fn drop(&mut self) {
464 if let Some(shutdown) = self.shutdown() {
465 self.executor.spawn(shutdown).detach();
466 }
467 }
468}
469
470impl Subscription {
471 pub fn detach(mut self) {
472 self.method = "";
473 }
474}
475
476impl Drop for Subscription {
477 fn drop(&mut self) {
478 self.notification_handlers.write().remove(self.method);
479 }
480}
481
482#[cfg(any(test, feature = "test-support"))]
483pub struct FakeLanguageServer {
484 handlers: FakeLanguageServerHandlers,
485 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
486 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
487 _input_task: Task<Result<()>>,
488 _output_task: Task<Result<()>>,
489}
490
491type FakeLanguageServerHandlers = Arc<
492 Mutex<
493 HashMap<
494 &'static str,
495 Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
496 >,
497 >,
498>;
499
500#[cfg(any(test, feature = "test-support"))]
501impl LanguageServer {
502 pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
503 Self::fake_with_capabilities(Default::default(), cx)
504 }
505
506 pub fn fake_with_capabilities(
507 capabilities: ServerCapabilities,
508 cx: &mut gpui::MutableAppContext,
509 ) -> (Arc<Self>, FakeLanguageServer) {
510 let (stdin_writer, stdin_reader) = async_pipe::pipe();
511 let (stdout_writer, stdout_reader) = async_pipe::pipe();
512
513 let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
514 fake.handle_request::<request::Initialize, _>({
515 let capabilities = capabilities.clone();
516 move |_, _| InitializeResult {
517 capabilities: capabilities.clone(),
518 ..Default::default()
519 }
520 });
521
522 let server = Self::new_internal(
523 stdin_writer,
524 stdout_reader,
525 Path::new("/"),
526 cx.background().clone(),
527 )
528 .unwrap();
529
530 (server, fake)
531 }
532}
533
534#[cfg(any(test, feature = "test-support"))]
535impl FakeLanguageServer {
536 fn new(
537 stdin: async_pipe::PipeReader,
538 stdout: async_pipe::PipeWriter,
539 cx: &mut gpui::MutableAppContext,
540 ) -> Self {
541 use futures::StreamExt as _;
542
543 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
544 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
545 let handlers = FakeLanguageServerHandlers::default();
546
547 let input_task = cx.spawn(|cx| {
548 let handlers = handlers.clone();
549 let outgoing_tx = outgoing_tx.clone();
550 async move {
551 let mut buffer = Vec::new();
552 let mut stdin = smol::io::BufReader::new(stdin);
553 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
554 cx.background().simulate_random_delay().await;
555 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
556 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
557
558 let response;
559 if let Some(handler) = handlers.lock().get_mut(request.method) {
560 response =
561 handler(request.id, request.params.get().as_bytes(), cx.clone());
562 log::debug!("handled lsp request. method:{}", request.method);
563 } else {
564 response = serde_json::to_vec(&AnyResponse {
565 id: request.id,
566 error: Some(Error {
567 message: "no handler".to_string(),
568 }),
569 result: None,
570 })
571 .unwrap();
572 log::debug!("unhandled lsp request. method:{}", request.method);
573 }
574 outgoing_tx.unbounded_send(response)?;
575 } else {
576 incoming_tx.unbounded_send(buffer.clone())?;
577 }
578 }
579 Ok::<_, anyhow::Error>(())
580 }
581 });
582
583 let output_task = cx.background().spawn(async move {
584 let mut stdout = smol::io::BufWriter::new(PipeWriterCloseOnDrop(stdout));
585 while let Some(message) = outgoing_rx.next().await {
586 stdout
587 .write_all(CONTENT_LEN_HEADER.as_bytes())
588 .await
589 .unwrap();
590 stdout
591 .write_all((format!("{}", message.len())).as_bytes())
592 .await
593 .unwrap();
594 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
595 stdout.write_all(&message).await.unwrap();
596 stdout.flush().await.unwrap();
597 }
598 Ok(())
599 });
600
601 Self {
602 outgoing_tx,
603 incoming_rx,
604 handlers,
605 _input_task: input_task,
606 _output_task: output_task,
607 }
608 }
609
610 pub async fn notify<T: notification::Notification>(&mut self, params: T::Params) {
611 let message = serde_json::to_vec(&Notification {
612 jsonrpc: JSON_RPC_VERSION,
613 method: T::METHOD,
614 params,
615 })
616 .unwrap();
617 self.outgoing_tx.unbounded_send(message).unwrap();
618 }
619
620 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
621 use futures::StreamExt as _;
622
623 loop {
624 let bytes = self.incoming_rx.next().await.unwrap();
625 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
626 assert_eq!(notification.method, T::METHOD);
627 return notification.params;
628 } else {
629 log::info!(
630 "skipping message in fake language server {:?}",
631 std::str::from_utf8(&bytes)
632 );
633 }
634 }
635 }
636
637 pub fn handle_request<T, F>(
638 &mut self,
639 mut handler: F,
640 ) -> futures::channel::mpsc::UnboundedReceiver<()>
641 where
642 T: 'static + request::Request,
643 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
644 {
645 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
646 self.handlers.lock().insert(
647 T::METHOD,
648 Box::new(move |id, params, cx| {
649 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
650 let result = serde_json::to_string(&result).unwrap();
651 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
652 let response = AnyResponse {
653 id,
654 error: None,
655 result: Some(result),
656 };
657 responded_tx.unbounded_send(()).ok();
658 serde_json::to_vec(&response).unwrap()
659 }),
660 );
661 responded_rx
662 }
663
664 pub fn remove_request_handler<T>(&mut self)
665 where
666 T: 'static + request::Request,
667 {
668 self.handlers.lock().remove(T::METHOD);
669 }
670
671 pub async fn start_progress(&mut self, token: impl Into<String>) {
672 self.notify::<notification::Progress>(ProgressParams {
673 token: NumberOrString::String(token.into()),
674 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
675 })
676 .await;
677 }
678
679 pub async fn end_progress(&mut self, token: impl Into<String>) {
680 self.notify::<notification::Progress>(ProgressParams {
681 token: NumberOrString::String(token.into()),
682 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
683 })
684 .await;
685 }
686
687 async fn receive(
688 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
689 buffer: &mut Vec<u8>,
690 ) -> Result<()> {
691 buffer.clear();
692 stdin.read_until(b'\n', buffer).await?;
693 stdin.read_until(b'\n', buffer).await?;
694 let message_len: usize = std::str::from_utf8(buffer)
695 .unwrap()
696 .strip_prefix(CONTENT_LEN_HEADER)
697 .unwrap()
698 .trim_end()
699 .parse()
700 .unwrap();
701 buffer.resize(message_len, 0);
702 stdin.read_exact(buffer).await?;
703 Ok(())
704 }
705}
706
707struct PipeWriterCloseOnDrop(async_pipe::PipeWriter);
708
709impl Drop for PipeWriterCloseOnDrop {
710 fn drop(&mut self) {
711 self.0.close().ok();
712 }
713}
714
715impl AsyncWrite for PipeWriterCloseOnDrop {
716 fn poll_write(
717 mut self: std::pin::Pin<&mut Self>,
718 cx: &mut std::task::Context<'_>,
719 buf: &[u8],
720 ) -> std::task::Poll<std::io::Result<usize>> {
721 let pipe = &mut self.0;
722 smol::pin!(pipe);
723 pipe.poll_write(cx, buf)
724 }
725
726 fn poll_flush(
727 mut self: std::pin::Pin<&mut Self>,
728 cx: &mut std::task::Context<'_>,
729 ) -> std::task::Poll<std::io::Result<()>> {
730 let pipe = &mut self.0;
731 smol::pin!(pipe);
732 pipe.poll_flush(cx)
733 }
734
735 fn poll_close(
736 mut self: std::pin::Pin<&mut Self>,
737 cx: &mut std::task::Context<'_>,
738 ) -> std::task::Poll<std::io::Result<()>> {
739 let pipe = &mut self.0;
740 smol::pin!(pipe);
741 pipe.poll_close(cx)
742 }
743}
744
745struct ClearResponseChannels(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
746
747impl Drop for ClearResponseChannels {
748 fn drop(&mut self) {
749 self.0.lock().clear();
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use gpui::TestAppContext;
757
758 #[ctor::ctor]
759 fn init_logger() {
760 if std::env::var("RUST_LOG").is_ok() {
761 env_logger::init();
762 }
763 }
764
765 #[gpui::test]
766 async fn test_fake(cx: &mut TestAppContext) {
767 let (server, mut fake) = cx.update(LanguageServer::fake);
768
769 let (message_tx, message_rx) = channel::unbounded();
770 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
771 server
772 .on_notification::<notification::ShowMessage, _>(move |params| {
773 message_tx.try_send(params).unwrap()
774 })
775 .detach();
776 server
777 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
778 diagnostics_tx.try_send(params).unwrap()
779 })
780 .detach();
781
782 server
783 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
784 text_document: TextDocumentItem::new(
785 Url::from_str("file://a/b").unwrap(),
786 "rust".to_string(),
787 0,
788 "".to_string(),
789 ),
790 })
791 .await
792 .unwrap();
793 assert_eq!(
794 fake.receive_notification::<notification::DidOpenTextDocument>()
795 .await
796 .text_document
797 .uri
798 .as_str(),
799 "file://a/b"
800 );
801
802 fake.notify::<notification::ShowMessage>(ShowMessageParams {
803 typ: MessageType::ERROR,
804 message: "ok".to_string(),
805 })
806 .await;
807 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
808 uri: Url::from_str("file://b/c").unwrap(),
809 version: Some(5),
810 diagnostics: vec![],
811 })
812 .await;
813 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
814 assert_eq!(
815 diagnostics_rx.recv().await.unwrap().uri.as_str(),
816 "file://b/c"
817 );
818
819 fake.handle_request::<request::Shutdown, _>(|_, _| ());
820
821 drop(server);
822 fake.receive_notification::<notification::Exit>().await;
823 }
824
825 pub enum ServerStatusNotification {}
826
827 impl notification::Notification for ServerStatusNotification {
828 type Params = ServerStatusParams;
829 const METHOD: &'static str = "experimental/serverStatus";
830 }
831
832 #[derive(Deserialize, Serialize, PartialEq, Eq, Clone)]
833 pub struct ServerStatusParams {
834 pub quiescent: bool,
835 }
836}