1use anyhow::{anyhow, Context, Result};
2use collections::HashMap;
3use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
4use gpui::{executor, Task};
5use parking_lot::{Mutex, RwLock};
6use postage::{barrier, prelude::Stream};
7use serde::{de::DeserializeOwned, Deserialize, Serialize};
8use serde_json::{json, value::RawValue, Value};
9use smol::{
10 channel,
11 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
12 process::Command,
13};
14use std::{
15 future::Future,
16 io::Write,
17 path::PathBuf,
18 str::FromStr,
19 sync::{
20 atomic::{AtomicUsize, Ordering::SeqCst},
21 Arc,
22 },
23};
24use std::{path::Path, process::Stdio};
25use util::TryFutureExt;
26
27pub use lsp_types::*;
28
29const JSON_RPC_VERSION: &'static str = "2.0";
30const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
31
32type NotificationHandler =
33 Box<dyn Send + Sync + FnMut(Option<usize>, &str, &mut channel::Sender<Vec<u8>>) -> Result<()>>;
34type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
35
36pub struct LanguageServer {
37 next_id: AtomicUsize,
38 outbound_tx: channel::Sender<Vec<u8>>,
39 name: String,
40 capabilities: ServerCapabilities,
41 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
42 response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
43 executor: Arc<executor::Background>,
44 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
45 output_done_rx: Mutex<Option<barrier::Receiver>>,
46 root_path: PathBuf,
47 options: Option<Value>,
48}
49
50pub struct Subscription {
51 method: &'static str,
52 notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
53}
54
55#[derive(Serialize, Deserialize)]
56struct Request<'a, T> {
57 jsonrpc: &'a str,
58 id: usize,
59 method: &'a str,
60 params: T,
61}
62
63#[cfg(any(test, feature = "test-support"))]
64#[derive(Deserialize)]
65struct AnyRequest<'a> {
66 id: usize,
67 #[serde(borrow)]
68 jsonrpc: &'a str,
69 #[serde(borrow)]
70 method: &'a str,
71 #[serde(borrow)]
72 params: &'a RawValue,
73}
74
75#[derive(Serialize, Deserialize)]
76struct AnyResponse<'a> {
77 id: usize,
78 #[serde(default)]
79 error: Option<Error>,
80 #[serde(borrow)]
81 result: Option<&'a RawValue>,
82}
83
84#[derive(Serialize)]
85struct Response<T> {
86 id: usize,
87 result: T,
88}
89
90#[derive(Serialize, Deserialize)]
91struct Notification<'a, T> {
92 #[serde(borrow)]
93 jsonrpc: &'a str,
94 #[serde(borrow)]
95 method: &'a str,
96 params: T,
97}
98
99#[derive(Deserialize)]
100struct AnyNotification<'a> {
101 #[serde(default)]
102 id: Option<usize>,
103 #[serde(borrow)]
104 method: &'a str,
105 #[serde(borrow)]
106 params: &'a RawValue,
107}
108
109#[derive(Debug, Serialize, Deserialize)]
110struct Error {
111 message: String,
112}
113
114impl LanguageServer {
115 pub fn new(
116 binary_path: &Path,
117 args: &[&str],
118 root_path: &Path,
119 options: Option<Value>,
120 background: Arc<executor::Background>,
121 ) -> Result<Self> {
122 let working_dir = if root_path.is_dir() {
123 root_path
124 } else {
125 root_path.parent().unwrap_or(Path::new("/"))
126 };
127 let mut server = Command::new(binary_path)
128 .current_dir(working_dir)
129 .args(args)
130 .stdin(Stdio::piped())
131 .stdout(Stdio::piped())
132 .stderr(Stdio::inherit())
133 .spawn()?;
134 let stdin = server.stdin.take().unwrap();
135 let stdout = server.stdout.take().unwrap();
136 let mut server = Self::new_internal(stdin, stdout, root_path, options, background);
137 if let Some(name) = binary_path.file_name() {
138 server.name = name.to_string_lossy().to_string();
139 }
140 Ok(server)
141 }
142
143 fn new_internal<Stdin, Stdout>(
144 stdin: Stdin,
145 stdout: Stdout,
146 root_path: &Path,
147 options: Option<Value>,
148 executor: Arc<executor::Background>,
149 ) -> Self
150 where
151 Stdin: AsyncWrite + Unpin + Send + 'static,
152 Stdout: AsyncRead + Unpin + Send + 'static,
153 {
154 let mut stdin = BufWriter::new(stdin);
155 let mut stdout = BufReader::new(stdout);
156 let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
157 let notification_handlers =
158 Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
159 let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
160 let input_task = executor.spawn(
161 {
162 let notification_handlers = notification_handlers.clone();
163 let response_handlers = response_handlers.clone();
164 let mut outbound_tx = outbound_tx.clone();
165 async move {
166 let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
167 let mut buffer = Vec::new();
168 loop {
169 buffer.clear();
170 stdout.read_until(b'\n', &mut buffer).await?;
171 stdout.read_until(b'\n', &mut buffer).await?;
172 let message_len: usize = std::str::from_utf8(&buffer)?
173 .strip_prefix(CONTENT_LEN_HEADER)
174 .ok_or_else(|| anyhow!("invalid header"))?
175 .trim_end()
176 .parse()?;
177
178 buffer.resize(message_len, 0);
179 stdout.read_exact(&mut buffer).await?;
180
181 if let Ok(AnyNotification { id, method, params }) =
182 serde_json::from_slice(&buffer)
183 {
184 if let Some(handler) = notification_handlers.write().get_mut(method) {
185 if let Err(e) = handler(id, params.get(), &mut outbound_tx) {
186 log::error!("error handling {} message: {:?}", method, e);
187 }
188 } else {
189 log::info!(
190 "unhandled notification {}:\n{}",
191 method,
192 serde_json::to_string_pretty(
193 &Value::from_str(params.get()).unwrap()
194 )
195 .unwrap()
196 );
197 }
198 } else if let Ok(AnyResponse { id, error, result }) =
199 serde_json::from_slice(&buffer)
200 {
201 if let Some(handler) = response_handlers.lock().remove(&id) {
202 if let Some(error) = error {
203 handler(Err(error));
204 } else if let Some(result) = result {
205 handler(Ok(result.get()));
206 } else {
207 handler(Ok("null"));
208 }
209 }
210 } else {
211 return Err(anyhow!(
212 "failed to deserialize message:\n{}",
213 std::str::from_utf8(&buffer)?
214 ));
215 }
216 }
217 }
218 }
219 .log_err(),
220 );
221 let (output_done_tx, output_done_rx) = barrier::channel();
222 let output_task = executor.spawn({
223 let response_handlers = response_handlers.clone();
224 async move {
225 let _clear_response_handlers = ClearResponseHandlers(response_handlers);
226 let mut content_len_buffer = Vec::new();
227 while let Ok(message) = outbound_rx.recv().await {
228 content_len_buffer.clear();
229 write!(content_len_buffer, "{}", message.len()).unwrap();
230 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
231 stdin.write_all(&content_len_buffer).await?;
232 stdin.write_all("\r\n\r\n".as_bytes()).await?;
233 stdin.write_all(&message).await?;
234 stdin.flush().await?;
235 }
236 drop(output_done_tx);
237 Ok(())
238 }
239 .log_err()
240 });
241
242 Self {
243 notification_handlers,
244 response_handlers,
245 name: Default::default(),
246 capabilities: Default::default(),
247 next_id: Default::default(),
248 outbound_tx,
249 executor: executor.clone(),
250 io_tasks: Mutex::new(Some((input_task, output_task))),
251 output_done_rx: Mutex::new(Some(output_done_rx)),
252 root_path: root_path.to_path_buf(),
253 options,
254 }
255 }
256
257 pub async fn initialize(mut self) -> Result<Arc<Self>> {
258 let options = self.options.take();
259 let mut this = Arc::new(self);
260 let root_uri = Url::from_file_path(&this.root_path).unwrap();
261 #[allow(deprecated)]
262 let params = InitializeParams {
263 process_id: Default::default(),
264 root_path: Default::default(),
265 root_uri: Some(root_uri),
266 initialization_options: options,
267 capabilities: ClientCapabilities {
268 workspace: Some(WorkspaceClientCapabilities {
269 configuration: Some(true),
270 did_change_configuration: Some(DynamicRegistrationClientCapabilities {
271 dynamic_registration: Some(true),
272 }),
273 ..Default::default()
274 }),
275 text_document: Some(TextDocumentClientCapabilities {
276 definition: Some(GotoCapability {
277 link_support: Some(true),
278 ..Default::default()
279 }),
280 code_action: Some(CodeActionClientCapabilities {
281 code_action_literal_support: Some(CodeActionLiteralSupport {
282 code_action_kind: CodeActionKindLiteralSupport {
283 value_set: vec![
284 CodeActionKind::REFACTOR.as_str().into(),
285 CodeActionKind::QUICKFIX.as_str().into(),
286 ],
287 },
288 }),
289 data_support: Some(true),
290 resolve_support: Some(CodeActionCapabilityResolveSupport {
291 properties: vec!["edit".to_string()],
292 }),
293 ..Default::default()
294 }),
295 completion: Some(CompletionClientCapabilities {
296 completion_item: Some(CompletionItemCapability {
297 snippet_support: Some(true),
298 resolve_support: Some(CompletionItemCapabilityResolveSupport {
299 properties: vec!["additionalTextEdits".to_string()],
300 }),
301 ..Default::default()
302 }),
303 ..Default::default()
304 }),
305 ..Default::default()
306 }),
307 experimental: Some(json!({
308 "serverStatusNotification": true,
309 })),
310 window: Some(WindowClientCapabilities {
311 work_done_progress: Some(true),
312 ..Default::default()
313 }),
314 ..Default::default()
315 },
316 trace: Default::default(),
317 workspace_folders: Default::default(),
318 client_info: Default::default(),
319 locale: Default::default(),
320 };
321
322 let response = this.request::<request::Initialize>(params).await?;
323 {
324 let this = Arc::get_mut(&mut this).unwrap();
325 if let Some(info) = response.server_info {
326 this.name = info.name;
327 }
328 this.capabilities = response.capabilities;
329 }
330 this.notify::<notification::Initialized>(InitializedParams {})?;
331 Ok(this)
332 }
333
334 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
335 if let Some(tasks) = self.io_tasks.lock().take() {
336 let response_handlers = self.response_handlers.clone();
337 let next_id = AtomicUsize::new(self.next_id.load(SeqCst));
338 let outbound_tx = self.outbound_tx.clone();
339 let mut output_done = self.output_done_rx.lock().take().unwrap();
340 let shutdown_request = Self::request_internal::<request::Shutdown>(
341 &next_id,
342 &response_handlers,
343 &outbound_tx,
344 (),
345 );
346 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
347 outbound_tx.close();
348 Some(
349 async move {
350 log::debug!("language server shutdown started");
351 shutdown_request.await?;
352 response_handlers.lock().clear();
353 exit?;
354 output_done.recv().await;
355 log::debug!("language server shutdown finished");
356 drop(tasks);
357 Ok(())
358 }
359 .log_err(),
360 )
361 } else {
362 None
363 }
364 }
365
366 pub fn on_notification<T, F>(&mut self, f: F) -> Subscription
367 where
368 T: notification::Notification,
369 F: 'static + Send + Sync + FnMut(T::Params),
370 {
371 self.on_custom_notification(T::METHOD, f)
372 }
373
374 pub fn on_request<T, F>(&mut self, f: F) -> Subscription
375 where
376 T: request::Request,
377 F: 'static + Send + Sync + FnMut(T::Params) -> Result<T::Result>,
378 {
379 self.on_custom_request(T::METHOD, f)
380 }
381
382 pub fn on_custom_notification<Params, F>(
383 &mut self,
384 method: &'static str,
385 mut f: F,
386 ) -> Subscription
387 where
388 F: 'static + Send + Sync + FnMut(Params),
389 Params: DeserializeOwned,
390 {
391 let prev_handler = self.notification_handlers.write().insert(
392 method,
393 Box::new(move |_, params, _| {
394 let params = serde_json::from_str(params)?;
395 f(params);
396 Ok(())
397 }),
398 );
399 assert!(
400 prev_handler.is_none(),
401 "registered multiple handlers for the same LSP method"
402 );
403 Subscription {
404 method,
405 notification_handlers: self.notification_handlers.clone(),
406 }
407 }
408
409 pub fn on_custom_request<Params, Res, F>(
410 &mut self,
411 method: &'static str,
412 mut f: F,
413 ) -> Subscription
414 where
415 F: 'static + Send + Sync + FnMut(Params) -> Result<Res>,
416 Params: DeserializeOwned,
417 Res: Serialize,
418 {
419 let prev_handler = self.notification_handlers.write().insert(
420 method,
421 Box::new(move |id, params, tx| {
422 if let Some(id) = id {
423 let params = serde_json::from_str(params)?;
424 let result = f(params)?;
425 let response = serde_json::to_vec(&Response { id, result })?;
426 tx.try_send(response)?;
427 }
428 Ok(())
429 }),
430 );
431 assert!(
432 prev_handler.is_none(),
433 "registered multiple handlers for the same LSP method"
434 );
435 Subscription {
436 method,
437 notification_handlers: self.notification_handlers.clone(),
438 }
439 }
440
441 pub fn name<'a>(self: &'a Arc<Self>) -> &'a str {
442 &self.name
443 }
444
445 pub fn capabilities<'a>(self: &'a Arc<Self>) -> &'a ServerCapabilities {
446 &self.capabilities
447 }
448
449 pub fn request<T: request::Request>(
450 self: &Arc<Self>,
451 params: T::Params,
452 ) -> impl Future<Output = Result<T::Result>>
453 where
454 T::Result: 'static + Send,
455 {
456 Self::request_internal::<T>(
457 &self.next_id,
458 &self.response_handlers,
459 &self.outbound_tx,
460 params,
461 )
462 }
463
464 fn request_internal<T: request::Request>(
465 next_id: &AtomicUsize,
466 response_handlers: &Mutex<HashMap<usize, ResponseHandler>>,
467 outbound_tx: &channel::Sender<Vec<u8>>,
468 params: T::Params,
469 ) -> impl 'static + Future<Output = Result<T::Result>>
470 where
471 T::Result: 'static + Send,
472 {
473 let id = next_id.fetch_add(1, SeqCst);
474 let message = serde_json::to_vec(&Request {
475 jsonrpc: JSON_RPC_VERSION,
476 id,
477 method: T::METHOD,
478 params,
479 })
480 .unwrap();
481
482 let send = outbound_tx
483 .try_send(message)
484 .context("failed to write to language server's stdin");
485
486 let (tx, rx) = oneshot::channel();
487 response_handlers.lock().insert(
488 id,
489 Box::new(move |result| {
490 let response = match result {
491 Ok(response) => {
492 serde_json::from_str(response).context("failed to deserialize response")
493 }
494 Err(error) => Err(anyhow!("{}", error.message)),
495 };
496 let _ = tx.send(response);
497 }),
498 );
499
500 async move {
501 send?;
502 rx.await?
503 }
504 }
505
506 pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
507 Self::notify_internal::<T>(&self.outbound_tx, params)
508 }
509
510 fn notify_internal<T: notification::Notification>(
511 outbound_tx: &channel::Sender<Vec<u8>>,
512 params: T::Params,
513 ) -> Result<()> {
514 let message = serde_json::to_vec(&Notification {
515 jsonrpc: JSON_RPC_VERSION,
516 method: T::METHOD,
517 params,
518 })
519 .unwrap();
520 outbound_tx.try_send(message)?;
521 Ok(())
522 }
523}
524
525impl Drop for LanguageServer {
526 fn drop(&mut self) {
527 if let Some(shutdown) = self.shutdown() {
528 self.executor.spawn(shutdown).detach();
529 }
530 }
531}
532
533impl Subscription {
534 pub fn detach(mut self) {
535 self.method = "";
536 }
537}
538
539impl Drop for Subscription {
540 fn drop(&mut self) {
541 self.notification_handlers.write().remove(self.method);
542 }
543}
544
545#[cfg(any(test, feature = "test-support"))]
546pub struct FakeLanguageServer {
547 handlers: FakeLanguageServerHandlers,
548 outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
549 incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
550 _input_task: Task<Result<()>>,
551 _output_task: Task<Result<()>>,
552}
553
554#[cfg(any(test, feature = "test-support"))]
555type FakeLanguageServerHandlers = Arc<
556 Mutex<
557 HashMap<
558 &'static str,
559 Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
560 >,
561 >,
562>;
563
564#[cfg(any(test, feature = "test-support"))]
565impl LanguageServer {
566 pub fn full_capabilities() -> ServerCapabilities {
567 ServerCapabilities {
568 document_highlight_provider: Some(OneOf::Left(true)),
569 code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
570 document_formatting_provider: Some(OneOf::Left(true)),
571 document_range_formatting_provider: Some(OneOf::Left(true)),
572 ..Default::default()
573 }
574 }
575
576 pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
577 Self::fake_with_capabilities(Self::full_capabilities(), cx)
578 }
579
580 pub fn fake_with_capabilities(
581 capabilities: ServerCapabilities,
582 cx: &mut gpui::MutableAppContext,
583 ) -> (Self, FakeLanguageServer) {
584 let (stdin_writer, stdin_reader) = async_pipe::pipe();
585 let (stdout_writer, stdout_reader) = async_pipe::pipe();
586
587 let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
588 fake.handle_request::<request::Initialize, _>({
589 let capabilities = capabilities.clone();
590 move |_, _| InitializeResult {
591 capabilities: capabilities.clone(),
592 ..Default::default()
593 }
594 });
595
596 let executor = cx.background().clone();
597 let server =
598 Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), None, executor);
599 (server, fake)
600 }
601}
602
603#[cfg(any(test, feature = "test-support"))]
604impl FakeLanguageServer {
605 fn new(
606 stdin: async_pipe::PipeReader,
607 stdout: async_pipe::PipeWriter,
608 cx: &mut gpui::MutableAppContext,
609 ) -> Self {
610 use futures::StreamExt as _;
611
612 let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
613 let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
614 let handlers = FakeLanguageServerHandlers::default();
615
616 let input_task = cx.spawn(|cx| {
617 let handlers = handlers.clone();
618 let outgoing_tx = outgoing_tx.clone();
619 async move {
620 let mut buffer = Vec::new();
621 let mut stdin = smol::io::BufReader::new(stdin);
622 while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
623 cx.background().simulate_random_delay().await;
624
625 if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
626 assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
627
628 let response;
629 if let Some(handler) = handlers.lock().get_mut(request.method) {
630 response =
631 handler(request.id, request.params.get().as_bytes(), cx.clone());
632 log::debug!("handled lsp request. method:{}", request.method);
633 } else {
634 response = serde_json::to_vec(&AnyResponse {
635 id: request.id,
636 error: Some(Error {
637 message: "no handler".to_string(),
638 }),
639 result: None,
640 })
641 .unwrap();
642 log::debug!("unhandled lsp request. method:{}", request.method);
643 }
644 outgoing_tx.unbounded_send(response)?;
645 } else {
646 incoming_tx.unbounded_send(buffer.clone())?;
647 }
648 }
649 Ok::<_, anyhow::Error>(())
650 }
651 });
652
653 let output_task = cx.background().spawn(async move {
654 let mut stdout = smol::io::BufWriter::new(stdout);
655 while let Some(message) = outgoing_rx.next().await {
656 stdout
657 .write_all(CONTENT_LEN_HEADER.as_bytes())
658 .await
659 .unwrap();
660 stdout
661 .write_all((format!("{}", message.len())).as_bytes())
662 .await
663 .unwrap();
664 stdout.write_all("\r\n\r\n".as_bytes()).await.unwrap();
665 stdout.write_all(&message).await.unwrap();
666 stdout.flush().await.unwrap();
667 }
668 Ok(())
669 });
670
671 Self {
672 outgoing_tx,
673 incoming_rx,
674 handlers,
675 _input_task: input_task,
676 _output_task: output_task,
677 }
678 }
679
680 pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
681 let message = serde_json::to_vec(&Notification {
682 jsonrpc: JSON_RPC_VERSION,
683 method: T::METHOD,
684 params,
685 })
686 .unwrap();
687 self.outgoing_tx.unbounded_send(message).unwrap();
688 }
689
690 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
691 use futures::StreamExt as _;
692
693 loop {
694 let bytes = self.incoming_rx.next().await.unwrap();
695 if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
696 assert_eq!(notification.method, T::METHOD);
697 return notification.params;
698 } else {
699 log::info!(
700 "skipping message in fake language server {:?}",
701 std::str::from_utf8(&bytes)
702 );
703 }
704 }
705 }
706
707 pub fn handle_request<T, F>(
708 &mut self,
709 mut handler: F,
710 ) -> futures::channel::mpsc::UnboundedReceiver<()>
711 where
712 T: 'static + request::Request,
713 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
714 {
715 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
716 self.handlers.lock().insert(
717 T::METHOD,
718 Box::new(move |id, params, cx| {
719 let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
720 let result = serde_json::to_string(&result).unwrap();
721 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
722 let response = AnyResponse {
723 id,
724 error: None,
725 result: Some(result),
726 };
727 responded_tx.unbounded_send(()).ok();
728 serde_json::to_vec(&response).unwrap()
729 }),
730 );
731 responded_rx
732 }
733
734 pub fn remove_request_handler<T>(&mut self)
735 where
736 T: 'static + request::Request,
737 {
738 self.handlers.lock().remove(T::METHOD);
739 }
740
741 pub async fn start_progress(&mut self, token: impl Into<String>) {
742 self.notify::<notification::Progress>(ProgressParams {
743 token: NumberOrString::String(token.into()),
744 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
745 });
746 }
747
748 pub async fn end_progress(&mut self, token: impl Into<String>) {
749 self.notify::<notification::Progress>(ProgressParams {
750 token: NumberOrString::String(token.into()),
751 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
752 });
753 }
754
755 async fn receive(
756 stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
757 buffer: &mut Vec<u8>,
758 ) -> Result<()> {
759 buffer.clear();
760 stdin.read_until(b'\n', buffer).await?;
761 stdin.read_until(b'\n', buffer).await?;
762 let message_len: usize = std::str::from_utf8(buffer)
763 .unwrap()
764 .strip_prefix(CONTENT_LEN_HEADER)
765 .ok_or_else(|| anyhow!("invalid content length header"))?
766 .trim_end()
767 .parse()
768 .unwrap();
769 buffer.resize(message_len, 0);
770 stdin.read_exact(buffer).await?;
771 Ok(())
772 }
773}
774
775struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
776
777impl Drop for ClearResponseHandlers {
778 fn drop(&mut self) {
779 self.0.lock().clear();
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 use super::*;
786 use gpui::TestAppContext;
787
788 #[ctor::ctor]
789 fn init_logger() {
790 if std::env::var("RUST_LOG").is_ok() {
791 env_logger::init();
792 }
793 }
794
795 #[gpui::test]
796 async fn test_fake(cx: &mut TestAppContext) {
797 let (mut server, mut fake) = cx.update(LanguageServer::fake);
798
799 let (message_tx, message_rx) = channel::unbounded();
800 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
801 server
802 .on_notification::<notification::ShowMessage, _>(move |params| {
803 message_tx.try_send(params).unwrap()
804 })
805 .detach();
806 server
807 .on_notification::<notification::PublishDiagnostics, _>(move |params| {
808 diagnostics_tx.try_send(params).unwrap()
809 })
810 .detach();
811
812 let server = server.initialize().await.unwrap();
813 server
814 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
815 text_document: TextDocumentItem::new(
816 Url::from_str("file://a/b").unwrap(),
817 "rust".to_string(),
818 0,
819 "".to_string(),
820 ),
821 })
822 .unwrap();
823 assert_eq!(
824 fake.receive_notification::<notification::DidOpenTextDocument>()
825 .await
826 .text_document
827 .uri
828 .as_str(),
829 "file://a/b"
830 );
831
832 fake.notify::<notification::ShowMessage>(ShowMessageParams {
833 typ: MessageType::ERROR,
834 message: "ok".to_string(),
835 });
836 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
837 uri: Url::from_str("file://b/c").unwrap(),
838 version: Some(5),
839 diagnostics: vec![],
840 });
841 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
842 assert_eq!(
843 diagnostics_rx.recv().await.unwrap().uri.as_str(),
844 "file://b/c"
845 );
846
847 fake.handle_request::<request::Shutdown, _>(|_, _| ());
848
849 drop(server);
850 fake.receive_notification::<notification::Exit>().await;
851 }
852}