1pub use lsp_types::request::*;
2pub use lsp_types::*;
3
4use anyhow::{anyhow, Context, Result};
5use collections::HashMap;
6use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
7use gpui::{executor, AsyncAppContext, Task};
8use parking_lot::Mutex;
9use postage::{barrier, prelude::Stream};
10use serde::{de::DeserializeOwned, Deserialize, Serialize};
11use serde_json::{json, value::RawValue, Value};
12use smol::{
13 channel,
14 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
15 process::{self, Child},
16};
17use std::{
18 future::Future,
19 io::Write,
20 path::PathBuf,
21 str::FromStr,
22 sync::{
23 atomic::{AtomicUsize, Ordering::SeqCst},
24 Arc,
25 },
26};
27use std::{path::Path, process::Stdio};
28use util::{ResultExt, TryFutureExt};
29
30const JSON_RPC_VERSION: &str = "2.0";
31const CONTENT_LEN_HEADER: &str = "Content-Length: ";
32
33type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
35
36pub struct LanguageServer {
37 server_id: usize,
38 next_id: AtomicUsize,
39 outbound_tx: channel::Sender<Vec<u8>>,
40 name: String,
41 capabilities: ServerCapabilities,
42 notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
43 response_handlers: Arc<Mutex<Option<HashMap<usize, ResponseHandler>>>>,
44 executor: Arc<executor::Background>,
45 #[allow(clippy::type_complexity)]
46 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
47 output_done_rx: Mutex<Option<barrier::Receiver>>,
48 root_path: PathBuf,
49 _server: Option<Child>,
50}
51
52pub struct Subscription {
53 method: &'static str,
54 notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
55}
56
57#[derive(Serialize, Deserialize)]
58struct Request<'a, T> {
59 jsonrpc: &'static str,
60 id: usize,
61 method: &'a str,
62 params: T,
63}
64
65#[derive(Serialize, Deserialize)]
66struct AnyResponse<'a> {
67 jsonrpc: &'a str,
68 id: usize,
69 #[serde(default)]
70 error: Option<Error>,
71 #[serde(borrow)]
72 result: Option<&'a RawValue>,
73}
74
75#[derive(Serialize)]
76struct Response<T> {
77 jsonrpc: &'static str,
78 id: usize,
79 result: Option<T>,
80 error: Option<Error>,
81}
82
83#[derive(Serialize, Deserialize)]
84struct Notification<'a, T> {
85 jsonrpc: &'static str,
86 #[serde(borrow)]
87 method: &'a str,
88 params: T,
89}
90
91#[derive(Deserialize)]
92struct AnyNotification<'a> {
93 #[serde(default)]
94 id: Option<usize>,
95 #[serde(borrow)]
96 method: &'a str,
97 #[serde(borrow)]
98 params: &'a RawValue,
99}
100
101#[derive(Debug, Serialize, Deserialize)]
102struct Error {
103 message: String,
104}
105
106impl LanguageServer {
107 pub fn new<T: AsRef<std::ffi::OsStr>>(
108 server_id: usize,
109 binary_path: &Path,
110 args: &[T],
111 root_path: &Path,
112 cx: AsyncAppContext,
113 ) -> Result<Self> {
114 let working_dir = if root_path.is_dir() {
115 root_path
116 } else {
117 root_path.parent().unwrap_or_else(|| Path::new("/"))
118 };
119 let mut server = process::Command::new(binary_path)
120 .current_dir(working_dir)
121 .args(args)
122 .stdin(Stdio::piped())
123 .stdout(Stdio::piped())
124 .stderr(Stdio::inherit())
125 .kill_on_drop(true)
126 .spawn()?;
127
128 let stdin = server.stdin.take().unwrap();
129 let stout = server.stdout.take().unwrap();
130
131 let mut server = Self::new_internal(
132 server_id,
133 stdin,
134 stout,
135 Some(server),
136 root_path,
137 cx,
138 |notification| {
139 log::info!(
140 "unhandled notification {}:\n{}",
141 notification.method,
142 serde_json::to_string_pretty(
143 &Value::from_str(notification.params.get()).unwrap()
144 )
145 .unwrap()
146 );
147 },
148 );
149 if let Some(name) = binary_path.file_name() {
150 server.name = name.to_string_lossy().to_string();
151 }
152 Ok(server)
153 }
154
155 fn new_internal<Stdin, Stdout, F>(
156 server_id: usize,
157 stdin: Stdin,
158 stdout: Stdout,
159 server: Option<Child>,
160 root_path: &Path,
161 cx: AsyncAppContext,
162 mut on_unhandled_notification: F,
163 ) -> Self
164 where
165 Stdin: AsyncWrite + Unpin + Send + 'static,
166 Stdout: AsyncRead + Unpin + Send + 'static,
167 F: FnMut(AnyNotification) + 'static + Send,
168 {
169 let mut stdin = BufWriter::new(stdin);
170 let mut stdout = BufReader::new(stdout);
171 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
172 let notification_handlers =
173 Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
174 let response_handlers =
175 Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
176 let input_task = cx.spawn(|cx| {
177 let notification_handlers = notification_handlers.clone();
178 let response_handlers = response_handlers.clone();
179 async move {
180 let _clear_response_handlers = util::defer({
181 let response_handlers = response_handlers.clone();
182 move || {
183 response_handlers.lock().take();
184 }
185 });
186 let mut buffer = Vec::new();
187 loop {
188 buffer.clear();
189 stdout.read_until(b'\n', &mut buffer).await?;
190 stdout.read_until(b'\n', &mut buffer).await?;
191 let message_len: usize = std::str::from_utf8(&buffer)?
192 .strip_prefix(CONTENT_LEN_HEADER)
193 .ok_or_else(|| anyhow!("invalid header"))?
194 .trim_end()
195 .parse()?;
196
197 buffer.resize(message_len, 0);
198 stdout.read_exact(&mut buffer).await?;
199 log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
200
201 if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
202 if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
203 handler(msg.id, msg.params.get(), cx.clone());
204 } else {
205 on_unhandled_notification(msg);
206 }
207 } else if let Ok(AnyResponse {
208 id, error, result, ..
209 }) = serde_json::from_slice(&buffer)
210 {
211 if let Some(handler) = response_handlers
212 .lock()
213 .as_mut()
214 .and_then(|handlers| handlers.remove(&id))
215 {
216 if let Some(error) = error {
217 handler(Err(error));
218 } else if let Some(result) = result {
219 handler(Ok(result.get()));
220 } else {
221 handler(Ok("null"));
222 }
223 }
224 } else {
225 return Err(anyhow!(
226 "failed to deserialize message:\n{}",
227 std::str::from_utf8(&buffer)?
228 ));
229 }
230
231 // Don't starve the main thread when receiving lots of messages at once.
232 smol::future::yield_now().await;
233 }
234 }
235 .log_err()
236 });
237 let (output_done_tx, output_done_rx) = barrier::channel();
238 let output_task = cx.background().spawn({
239 let response_handlers = response_handlers.clone();
240 async move {
241 let _clear_response_handlers = util::defer({
242 let response_handlers = response_handlers.clone();
243 move || {
244 response_handlers.lock().take();
245 }
246 });
247 let mut content_len_buffer = Vec::new();
248 while let Ok(message) = outbound_rx.recv().await {
249 log::trace!("outgoing message:{}", String::from_utf8_lossy(&message));
250 content_len_buffer.clear();
251 write!(content_len_buffer, "{}", message.len()).unwrap();
252 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
253 stdin.write_all(&content_len_buffer).await?;
254 stdin.write_all("\r\n\r\n".as_bytes()).await?;
255 stdin.write_all(&message).await?;
256 stdin.flush().await?;
257 }
258 drop(output_done_tx);
259 Ok(())
260 }
261 .log_err()
262 });
263
264 Self {
265 server_id,
266 notification_handlers,
267 response_handlers,
268 name: Default::default(),
269 capabilities: Default::default(),
270 next_id: Default::default(),
271 outbound_tx,
272 executor: cx.background(),
273 io_tasks: Mutex::new(Some((input_task, output_task))),
274 output_done_rx: Mutex::new(Some(output_done_rx)),
275 root_path: root_path.to_path_buf(),
276 _server: server,
277 }
278 }
279
280 /// Initializes a language server.
281 /// Note that `options` is used directly to construct [`InitializeParams`],
282 /// which is why it is owned.
283 pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
284 let root_uri = Url::from_file_path(&self.root_path).unwrap();
285 #[allow(deprecated)]
286 let params = InitializeParams {
287 process_id: Default::default(),
288 root_path: Default::default(),
289 root_uri: Some(root_uri.clone()),
290 initialization_options: options,
291 capabilities: ClientCapabilities {
292 workspace: Some(WorkspaceClientCapabilities {
293 configuration: Some(true),
294 did_change_configuration: Some(DynamicRegistrationClientCapabilities {
295 dynamic_registration: Some(true),
296 }),
297 ..Default::default()
298 }),
299 text_document: Some(TextDocumentClientCapabilities {
300 definition: Some(GotoCapability {
301 link_support: Some(true),
302 ..Default::default()
303 }),
304 code_action: Some(CodeActionClientCapabilities {
305 code_action_literal_support: Some(CodeActionLiteralSupport {
306 code_action_kind: CodeActionKindLiteralSupport {
307 value_set: vec![
308 CodeActionKind::REFACTOR.as_str().into(),
309 CodeActionKind::QUICKFIX.as_str().into(),
310 CodeActionKind::SOURCE.as_str().into(),
311 ],
312 },
313 }),
314 data_support: Some(true),
315 resolve_support: Some(CodeActionCapabilityResolveSupport {
316 properties: vec!["edit".to_string(), "command".to_string()],
317 }),
318 ..Default::default()
319 }),
320 completion: Some(CompletionClientCapabilities {
321 completion_item: Some(CompletionItemCapability {
322 snippet_support: Some(true),
323 resolve_support: Some(CompletionItemCapabilityResolveSupport {
324 properties: vec!["additionalTextEdits".to_string()],
325 }),
326 ..Default::default()
327 }),
328 ..Default::default()
329 }),
330 rename: Some(RenameClientCapabilities {
331 prepare_support: Some(true),
332 ..Default::default()
333 }),
334 hover: Some(HoverClientCapabilities {
335 content_format: Some(vec![MarkupKind::Markdown]),
336 ..Default::default()
337 }),
338 ..Default::default()
339 }),
340 experimental: Some(json!({
341 "serverStatusNotification": true,
342 })),
343 window: Some(WindowClientCapabilities {
344 work_done_progress: Some(true),
345 ..Default::default()
346 }),
347 ..Default::default()
348 },
349 trace: Default::default(),
350 workspace_folders: Some(vec![WorkspaceFolder {
351 uri: root_uri,
352 name: Default::default(),
353 }]),
354 client_info: Default::default(),
355 locale: Default::default(),
356 };
357
358 let response = self.request::<request::Initialize>(params).await?;
359 if let Some(info) = response.server_info {
360 self.name = info.name;
361 }
362 self.capabilities = response.capabilities;
363
364 self.notify::<notification::Initialized>(InitializedParams {})?;
365 Ok(Arc::new(self))
366 }
367
368 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
369 if let Some(tasks) = self.io_tasks.lock().take() {
370 let response_handlers = self.response_handlers.clone();
371 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
372 let outbound_tx = self.outbound_tx.clone();
373 let mut output_done = self.output_done_rx.lock().take().unwrap();
374 let shutdown_request = Self::request_internal::<request::Shutdown>(
375 &next_id,
376 &response_handlers,
377 &outbound_tx,
378 (),
379 );
380 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
381 outbound_tx.close();
382 Some(
383 async move {
384 log::debug!("language server shutdown started");
385 shutdown_request.await?;
386 response_handlers.lock().take();
387 exit?;
388 output_done.recv().await;
389 log::debug!("language server shutdown finished");
390 drop(tasks);
391 Ok(())
392 }
393 .log_err(),
394 )
395 } else {
396 None
397 }
398 }
399
400 #[must_use]
401 pub fn on_notification<T, F>(&self, f: F) -> Subscription
402 where
403 T: notification::Notification,
404 F: 'static + Send + FnMut(T::Params, AsyncAppContext),
405 {
406 self.on_custom_notification(T::METHOD, f)
407 }
408
409 #[must_use]
410 pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
411 where
412 T: request::Request,
413 T::Params: 'static + Send,
414 F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
415 Fut: 'static + Future<Output = Result<T::Result>>,
416 {
417 self.on_custom_request(T::METHOD, f)
418 }
419
420 pub fn remove_request_handler<T: request::Request>(&self) {
421 self.notification_handlers.lock().remove(T::METHOD);
422 }
423
424 #[must_use]
425 pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
426 where
427 F: 'static + Send + FnMut(Params, AsyncAppContext),
428 Params: DeserializeOwned,
429 {
430 let prev_handler = self.notification_handlers.lock().insert(
431 method,
432 Box::new(move |_, params, cx| {
433 if let Some(params) = serde_json::from_str(params).log_err() {
434 f(params, cx);
435 }
436 }),
437 );
438 assert!(
439 prev_handler.is_none(),
440 "registered multiple handlers for the same LSP method"
441 );
442 Subscription {
443 method,
444 notification_handlers: self.notification_handlers.clone(),
445 }
446 }
447
448 #[must_use]
449 pub fn on_custom_request<Params, Res, Fut, F>(
450 &self,
451 method: &'static str,
452 mut f: F,
453 ) -> Subscription
454 where
455 F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
456 Fut: 'static + Future<Output = Result<Res>>,
457 Params: DeserializeOwned + Send + 'static,
458 Res: Serialize,
459 {
460 let outbound_tx = self.outbound_tx.clone();
461 let prev_handler = self.notification_handlers.lock().insert(
462 method,
463 Box::new(move |id, params, cx| {
464 if let Some(id) = id {
465 match serde_json::from_str(params) {
466 Ok(params) => {
467 let response = f(params, cx.clone());
468 cx.foreground()
469 .spawn({
470 let outbound_tx = outbound_tx.clone();
471 async move {
472 let response = match response.await {
473 Ok(result) => Response {
474 jsonrpc: JSON_RPC_VERSION,
475 id,
476 result: Some(result),
477 error: None,
478 },
479 Err(error) => Response {
480 jsonrpc: JSON_RPC_VERSION,
481 id,
482 result: None,
483 error: Some(Error {
484 message: error.to_string(),
485 }),
486 },
487 };
488 if let Some(response) =
489 serde_json::to_vec(&response).log_err()
490 {
491 outbound_tx.try_send(response).ok();
492 }
493 }
494 })
495 .detach();
496 }
497 Err(error) => {
498 log::error!(
499 "error deserializing {} request: {:?}, message: {:?}",
500 method,
501 error,
502 params
503 );
504 let response = AnyResponse {
505 jsonrpc: JSON_RPC_VERSION,
506 id,
507 result: None,
508 error: Some(Error {
509 message: error.to_string(),
510 }),
511 };
512 if let Some(response) = serde_json::to_vec(&response).log_err() {
513 outbound_tx.try_send(response).ok();
514 }
515 }
516 }
517 }
518 }),
519 );
520 assert!(
521 prev_handler.is_none(),
522 "registered multiple handlers for the same LSP method"
523 );
524 Subscription {
525 method,
526 notification_handlers: self.notification_handlers.clone(),
527 }
528 }
529
530 pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
531 &self.name
532 }
533
534 pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
535 &self.capabilities
536 }
537
538 pub fn server_id(&self) -> usize {
539 self.server_id
540 }
541
542 pub fn root_path(&self) -> &PathBuf {
543 &self.root_path
544 }
545
546 pub fn request<T: request::Request>(
547 &self,
548 params: T::Params,
549 ) -> impl Future<Output = Result<T::Result>>
550 where
551 T::Result: 'static + Send,
552 {
553 Self::request_internal::<T>(
554 &self.next_id,
555 &self.response_handlers,
556 &self.outbound_tx,
557 params,
558 )
559 }
560
561 fn request_internal<T: request::Request>(
562 next_id: &AtomicUsize,
563 response_handlers: &Mutex<Option<HashMap<usize, ResponseHandler>>>,
564 outbound_tx: &channel::Sender<Vec<u8>>,
565 params: T::Params,
566 ) -> impl 'static + Future<Output = Result<T::Result>>
567 where
568 T::Result: 'static + Send,
569 {
570 let id = next_id.fetch_add(1, SeqCst);
571 let message = serde_json::to_vec(&Request {
572 jsonrpc: JSON_RPC_VERSION,
573 id,
574 method: T::METHOD,
575 params,
576 })
577 .unwrap();
578
579 let (tx, rx) = oneshot::channel();
580 let handle_response = response_handlers
581 .lock()
582 .as_mut()
583 .ok_or_else(|| anyhow!("server shut down"))
584 .map(|handlers| {
585 handlers.insert(
586 id,
587 Box::new(move |result| {
588 let response = match result {
589 Ok(response) => serde_json::from_str(response)
590 .context("failed to deserialize response"),
591 Err(error) => Err(anyhow!("{}", error.message)),
592 };
593 let _ = tx.send(response);
594 }),
595 );
596 });
597
598 let send = outbound_tx
599 .try_send(message)
600 .context("failed to write to language server's stdin");
601
602 async move {
603 handle_response?;
604 send?;
605 rx.await?
606 }
607 }
608
609 pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
610 Self::notify_internal::<T>(&self.outbound_tx, params)
611 }
612
613 fn notify_internal<T: notification::Notification>(
614 outbound_tx: &channel::Sender<Vec<u8>>,
615 params: T::Params,
616 ) -> Result<()> {
617 let message = serde_json::to_vec(&Notification {
618 jsonrpc: JSON_RPC_VERSION,
619 method: T::METHOD,
620 params,
621 })
622 .unwrap();
623 outbound_tx.try_send(message)?;
624 Ok(())
625 }
626}
627
628impl Drop for LanguageServer {
629 fn drop(&mut self) {
630 if let Some(shutdown) = self.shutdown() {
631 self.executor.spawn(shutdown).detach();
632 }
633 }
634}
635
636impl Subscription {
637 pub fn detach(mut self) {
638 self.method = "";
639 }
640}
641
642impl Drop for Subscription {
643 fn drop(&mut self) {
644 self.notification_handlers.lock().remove(self.method);
645 }
646}
647
648#[cfg(any(test, feature = "test-support"))]
649#[derive(Clone)]
650pub struct FakeLanguageServer {
651 pub server: Arc<LanguageServer>,
652 notifications_rx: channel::Receiver<(String, String)>,
653}
654
655#[cfg(any(test, feature = "test-support"))]
656impl LanguageServer {
657 pub fn full_capabilities() -> ServerCapabilities {
658 ServerCapabilities {
659 document_highlight_provider: Some(OneOf::Left(true)),
660 code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
661 document_formatting_provider: Some(OneOf::Left(true)),
662 document_range_formatting_provider: Some(OneOf::Left(true)),
663 ..Default::default()
664 }
665 }
666
667 pub fn fake(
668 name: String,
669 capabilities: ServerCapabilities,
670 cx: AsyncAppContext,
671 ) -> (Self, FakeLanguageServer) {
672 let (stdin_writer, stdin_reader) = async_pipe::pipe();
673 let (stdout_writer, stdout_reader) = async_pipe::pipe();
674 let (notifications_tx, notifications_rx) = channel::unbounded();
675
676 let server = Self::new_internal(
677 0,
678 stdin_writer,
679 stdout_reader,
680 None,
681 Path::new("/"),
682 cx.clone(),
683 |_| {},
684 );
685 let fake = FakeLanguageServer {
686 server: Arc::new(Self::new_internal(
687 0,
688 stdout_writer,
689 stdin_reader,
690 None,
691 Path::new("/"),
692 cx,
693 move |msg| {
694 notifications_tx
695 .try_send((msg.method.to_string(), msg.params.get().to_string()))
696 .ok();
697 },
698 )),
699 notifications_rx,
700 };
701 fake.handle_request::<request::Initialize, _, _>({
702 let capabilities = capabilities;
703 move |_, _| {
704 let capabilities = capabilities.clone();
705 let name = name.clone();
706 async move {
707 Ok(InitializeResult {
708 capabilities,
709 server_info: Some(ServerInfo {
710 name,
711 ..Default::default()
712 }),
713 })
714 }
715 }
716 });
717
718 (server, fake)
719 }
720}
721
722#[cfg(any(test, feature = "test-support"))]
723impl FakeLanguageServer {
724 pub fn notify<T: notification::Notification>(&self, params: T::Params) {
725 self.server.notify::<T>(params).ok();
726 }
727
728 pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
729 where
730 T: request::Request,
731 T::Result: 'static + Send,
732 {
733 self.server.request::<T>(params).await
734 }
735
736 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
737 self.try_receive_notification::<T>().await.unwrap()
738 }
739
740 pub async fn try_receive_notification<T: notification::Notification>(
741 &mut self,
742 ) -> Option<T::Params> {
743 use futures::StreamExt as _;
744
745 loop {
746 let (method, params) = self.notifications_rx.next().await?;
747 if method == T::METHOD {
748 return Some(serde_json::from_str::<T::Params>(¶ms).unwrap());
749 } else {
750 log::info!("skipping message in fake language server {:?}", params);
751 }
752 }
753 }
754
755 pub fn handle_request<T, F, Fut>(
756 &self,
757 mut handler: F,
758 ) -> futures::channel::mpsc::UnboundedReceiver<()>
759 where
760 T: 'static + request::Request,
761 T::Params: 'static + Send,
762 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
763 Fut: 'static + Send + Future<Output = Result<T::Result>>,
764 {
765 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
766 self.server.remove_request_handler::<T>();
767 self.server
768 .on_request::<T, _, _>(move |params, cx| {
769 let result = handler(params, cx.clone());
770 let responded_tx = responded_tx.clone();
771 async move {
772 cx.background().simulate_random_delay().await;
773 let result = result.await;
774 responded_tx.unbounded_send(()).ok();
775 result
776 }
777 })
778 .detach();
779 responded_rx
780 }
781
782 pub fn remove_request_handler<T>(&mut self)
783 where
784 T: 'static + request::Request,
785 {
786 self.server.remove_request_handler::<T>();
787 }
788
789 pub async fn start_progress(&self, token: impl Into<String>) {
790 let token = token.into();
791 self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
792 token: NumberOrString::String(token.clone()),
793 })
794 .await
795 .unwrap();
796 self.notify::<notification::Progress>(ProgressParams {
797 token: NumberOrString::String(token),
798 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
799 });
800 }
801
802 pub fn end_progress(&self, token: impl Into<String>) {
803 self.notify::<notification::Progress>(ProgressParams {
804 token: NumberOrString::String(token.into()),
805 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
806 });
807 }
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813 use gpui::TestAppContext;
814
815 #[ctor::ctor]
816 fn init_logger() {
817 if std::env::var("RUST_LOG").is_ok() {
818 env_logger::init();
819 }
820 }
821
822 #[gpui::test]
823 async fn test_fake(cx: &mut TestAppContext) {
824 let (server, mut fake) =
825 LanguageServer::fake("the-lsp".to_string(), Default::default(), cx.to_async());
826
827 let (message_tx, message_rx) = channel::unbounded();
828 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
829 server
830 .on_notification::<notification::ShowMessage, _>(move |params, _| {
831 message_tx.try_send(params).unwrap()
832 })
833 .detach();
834 server
835 .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
836 diagnostics_tx.try_send(params).unwrap()
837 })
838 .detach();
839
840 let server = server.initialize(None).await.unwrap();
841 server
842 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
843 text_document: TextDocumentItem::new(
844 Url::from_str("file://a/b").unwrap(),
845 "rust".to_string(),
846 0,
847 "".to_string(),
848 ),
849 })
850 .unwrap();
851 assert_eq!(
852 fake.receive_notification::<notification::DidOpenTextDocument>()
853 .await
854 .text_document
855 .uri
856 .as_str(),
857 "file://a/b"
858 );
859
860 fake.notify::<notification::ShowMessage>(ShowMessageParams {
861 typ: MessageType::ERROR,
862 message: "ok".to_string(),
863 });
864 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
865 uri: Url::from_str("file://b/c").unwrap(),
866 version: Some(5),
867 diagnostics: vec![],
868 });
869 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
870 assert_eq!(
871 diagnostics_rx.recv().await.unwrap().uri.as_str(),
872 "file://b/c"
873 );
874
875 fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
876
877 drop(server);
878 fake.receive_notification::<notification::Exit>().await;
879 }
880}