1use log::warn;
2pub use lsp_types::request::*;
3pub use lsp_types::*;
4
5use anyhow::{anyhow, Context, Result};
6use collections::HashMap;
7use futures::{channel::oneshot, io::BufWriter, select, AsyncRead, AsyncWrite, Future, FutureExt};
8use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task};
9use parking_lot::Mutex;
10use postage::{barrier, prelude::Stream};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use serde_json::{json, value::RawValue, Value};
13use smol::{
14 channel,
15 io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
16 process::{self, Child},
17};
18
19#[cfg(target_os = "windows")]
20use smol::process::windows::CommandExt;
21
22use std::{
23 ffi::OsString,
24 fmt,
25 io::Write,
26 path::PathBuf,
27 pin::Pin,
28 str::{self, FromStr as _},
29 sync::{
30 atomic::{AtomicI32, Ordering::SeqCst},
31 Arc, Weak,
32 },
33 task::Poll,
34 time::{Duration, Instant},
35};
36use std::{path::Path, process::Stdio};
37use util::{ResultExt, TryFutureExt};
38
39const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n";
40const JSON_RPC_VERSION: &str = "2.0";
41const CONTENT_LEN_HEADER: &str = "Content-Length: ";
42const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
43const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
44
45type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, &str, AsyncAppContext)>;
46type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
47type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
48
49/// Kind of language server stdio given to an IO handler.
50#[derive(Debug, Clone, Copy)]
51pub enum IoKind {
52 StdOut,
53 StdIn,
54 StdErr,
55}
56
57/// Represents a launchable language server. This can either be a standalone binary or the path
58/// to a runtime with arguments to instruct it to launch the actual language server file.
59#[derive(Debug, Clone, Deserialize)]
60pub struct LanguageServerBinary {
61 pub path: PathBuf,
62 pub arguments: Vec<OsString>,
63 pub env: Option<HashMap<String, String>>,
64}
65
66/// A running language server process.
67pub struct LanguageServer {
68 server_id: LanguageServerId,
69 next_id: AtomicI32,
70 outbound_tx: channel::Sender<String>,
71 name: Arc<str>,
72 capabilities: ServerCapabilities,
73 code_action_kinds: Option<Vec<CodeActionKind>>,
74 notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
75 response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
76 io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
77 executor: BackgroundExecutor,
78 #[allow(clippy::type_complexity)]
79 io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
80 output_done_rx: Mutex<Option<barrier::Receiver>>,
81 root_path: PathBuf,
82 server: Arc<Mutex<Option<Child>>>,
83}
84
85/// Identifies a running language server.
86#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
87#[repr(transparent)]
88pub struct LanguageServerId(pub usize);
89
90/// Handle to a language server RPC activity subscription.
91pub enum Subscription {
92 Notification {
93 method: &'static str,
94 notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
95 },
96 Io {
97 id: i32,
98 io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
99 },
100}
101
102/// Language server protocol RPC request message ID.
103///
104/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
105#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
106#[serde(untagged)]
107pub enum RequestId {
108 Int(i32),
109 Str(String),
110}
111
112/// Language server protocol RPC request message.
113///
114/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
115#[derive(Serialize, Deserialize)]
116pub struct Request<'a, T> {
117 jsonrpc: &'static str,
118 id: RequestId,
119 method: &'a str,
120 params: T,
121}
122
123/// Language server protocol RPC request response message before it is deserialized into a concrete type.
124#[derive(Serialize, Deserialize)]
125struct AnyResponse<'a> {
126 jsonrpc: &'a str,
127 id: RequestId,
128 #[serde(default)]
129 error: Option<Error>,
130 #[serde(borrow)]
131 result: Option<&'a RawValue>,
132}
133
134/// Language server protocol RPC request response message.
135///
136/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
137#[derive(Serialize)]
138struct Response<T> {
139 jsonrpc: &'static str,
140 id: RequestId,
141 #[serde(flatten)]
142 value: LspResult<T>,
143}
144
145#[derive(Serialize)]
146#[serde(rename_all = "snake_case")]
147enum LspResult<T> {
148 #[serde(rename = "result")]
149 Ok(Option<T>),
150 Error(Option<Error>),
151}
152
153/// Language server protocol RPC notification message.
154///
155/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
156#[derive(Serialize, Deserialize)]
157struct Notification<'a, T> {
158 jsonrpc: &'static str,
159 #[serde(borrow)]
160 method: &'a str,
161 params: T,
162}
163
164/// Language server RPC notification message before it is deserialized into a concrete type.
165#[derive(Debug, Clone, Deserialize)]
166struct AnyNotification<'a> {
167 #[serde(default)]
168 id: Option<RequestId>,
169 #[serde(borrow)]
170 method: &'a str,
171 #[serde(borrow, default)]
172 params: Option<&'a RawValue>,
173}
174
175#[derive(Debug, Serialize, Deserialize)]
176struct Error {
177 message: String,
178}
179
180pub trait LspRequestFuture<O>: Future<Output = O> {
181 fn id(&self) -> i32;
182}
183
184struct LspRequest<F> {
185 id: i32,
186 request: F,
187}
188
189impl<F> LspRequest<F> {
190 pub fn new(id: i32, request: F) -> Self {
191 Self { id, request }
192 }
193}
194
195impl<F: Future> Future for LspRequest<F> {
196 type Output = F::Output;
197
198 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
199 // SAFETY: This is standard pin projection, we're pinned so our fields must be pinned.
200 let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().request) };
201 inner.poll(cx)
202 }
203}
204
205impl<F: Future> LspRequestFuture<F::Output> for LspRequest<F> {
206 fn id(&self) -> i32 {
207 self.id
208 }
209}
210
211/// Experimental: Informs the end user about the state of the server
212///
213/// [Rust Analyzer Specification](https://github.com/rust-lang/rust-analyzer/blob/master/docs/dev/lsp-extensions.md#server-status)
214#[derive(Debug)]
215pub enum ServerStatus {}
216
217/// Other(String) variant to handle unknown values due to this still being experimental
218#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
219#[serde(rename_all = "camelCase")]
220pub enum ServerHealthStatus {
221 Ok,
222 Warning,
223 Error,
224 Other(String),
225}
226
227#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
228#[serde(rename_all = "camelCase")]
229pub struct ServerStatusParams {
230 pub health: ServerHealthStatus,
231 pub message: Option<String>,
232}
233
234impl lsp_types::notification::Notification for ServerStatus {
235 type Params = ServerStatusParams;
236 const METHOD: &'static str = "experimental/serverStatus";
237}
238
239impl LanguageServer {
240 /// Starts a language server process.
241 pub fn new(
242 stderr_capture: Arc<Mutex<Option<String>>>,
243 server_id: LanguageServerId,
244 binary: LanguageServerBinary,
245 root_path: &Path,
246 code_action_kinds: Option<Vec<CodeActionKind>>,
247 cx: AsyncAppContext,
248 ) -> Result<Self> {
249 let working_dir = if root_path.is_dir() {
250 root_path
251 } else {
252 root_path.parent().unwrap_or_else(|| Path::new("/"))
253 };
254
255 log::info!(
256 "starting language server. binary path: {:?}, working directory: {:?}, args: {:?}",
257 binary.path,
258 working_dir,
259 &binary.arguments
260 );
261
262 let mut command = process::Command::new(&binary.path);
263 command
264 .current_dir(working_dir)
265 .args(binary.arguments)
266 .envs(binary.env.unwrap_or_default())
267 .stdin(Stdio::piped())
268 .stdout(Stdio::piped())
269 .stderr(Stdio::piped())
270 .kill_on_drop(true);
271 #[cfg(windows)]
272 command.creation_flags(windows::Win32::System::Threading::CREATE_NO_WINDOW.0);
273 let mut server = command.spawn()?;
274
275 let stdin = server.stdin.take().unwrap();
276 let stdout = server.stdout.take().unwrap();
277 let stderr = server.stderr.take().unwrap();
278 let mut server = Self::new_internal(
279 server_id,
280 stdin,
281 stdout,
282 Some(stderr),
283 stderr_capture,
284 Some(server),
285 root_path,
286 code_action_kinds,
287 cx,
288 move |notification| {
289 log::info!(
290 "Language server with id {} sent unhandled notification {}:\n{}",
291 server_id,
292 notification.method,
293 serde_json::to_string_pretty(
294 ¬ification
295 .params
296 .and_then(|params| Value::from_str(params.get()).ok())
297 .unwrap_or(Value::Null)
298 )
299 .unwrap(),
300 );
301 },
302 );
303
304 if let Some(name) = binary.path.file_name() {
305 server.name = name.to_string_lossy().into();
306 }
307
308 Ok(server)
309 }
310
311 #[allow(clippy::too_many_arguments)]
312 fn new_internal<Stdin, Stdout, Stderr, F>(
313 server_id: LanguageServerId,
314 stdin: Stdin,
315 stdout: Stdout,
316 stderr: Option<Stderr>,
317 stderr_capture: Arc<Mutex<Option<String>>>,
318 server: Option<Child>,
319 root_path: &Path,
320 code_action_kinds: Option<Vec<CodeActionKind>>,
321 cx: AsyncAppContext,
322 on_unhandled_notification: F,
323 ) -> Self
324 where
325 Stdin: AsyncWrite + Unpin + Send + 'static,
326 Stdout: AsyncRead + Unpin + Send + 'static,
327 Stderr: AsyncRead + Unpin + Send + 'static,
328 F: FnMut(AnyNotification) + 'static + Send + Sync + Clone,
329 {
330 let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
331 let (output_done_tx, output_done_rx) = barrier::channel();
332 let notification_handlers =
333 Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
334 let response_handlers =
335 Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
336 let io_handlers = Arc::new(Mutex::new(HashMap::default()));
337
338 let stdout_input_task = cx.spawn({
339 let on_unhandled_notification = on_unhandled_notification.clone();
340 let notification_handlers = notification_handlers.clone();
341 let response_handlers = response_handlers.clone();
342 let io_handlers = io_handlers.clone();
343 move |cx| {
344 Self::handle_input(
345 stdout,
346 on_unhandled_notification,
347 notification_handlers,
348 response_handlers,
349 io_handlers,
350 cx,
351 )
352 .log_err()
353 }
354 });
355 let stderr_input_task = stderr
356 .map(|stderr| {
357 let io_handlers = io_handlers.clone();
358 let stderr_captures = stderr_capture.clone();
359 cx.spawn(|_| Self::handle_stderr(stderr, io_handlers, stderr_captures).log_err())
360 })
361 .unwrap_or_else(|| Task::Ready(Some(None)));
362 let input_task = cx.spawn(|_| async move {
363 let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
364 stdout.or(stderr)
365 });
366 let output_task = cx.background_executor().spawn({
367 Self::handle_output(
368 stdin,
369 outbound_rx,
370 output_done_tx,
371 response_handlers.clone(),
372 io_handlers.clone(),
373 )
374 .log_err()
375 });
376
377 Self {
378 server_id,
379 notification_handlers,
380 response_handlers,
381 io_handlers,
382 name: "".into(),
383 capabilities: Default::default(),
384 code_action_kinds,
385 next_id: Default::default(),
386 outbound_tx,
387 executor: cx.background_executor().clone(),
388 io_tasks: Mutex::new(Some((input_task, output_task))),
389 output_done_rx: Mutex::new(Some(output_done_rx)),
390 root_path: root_path.to_path_buf(),
391 server: Arc::new(Mutex::new(server)),
392 }
393 }
394
395 /// List of code action kinds this language server reports being able to emit.
396 pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
397 self.code_action_kinds.clone()
398 }
399
400 async fn handle_input<Stdout, F>(
401 stdout: Stdout,
402 mut on_unhandled_notification: F,
403 notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
404 response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
405 io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
406 cx: AsyncAppContext,
407 ) -> anyhow::Result<()>
408 where
409 Stdout: AsyncRead + Unpin + Send + 'static,
410 F: FnMut(AnyNotification) + 'static + Send,
411 {
412 let mut stdout = BufReader::new(stdout);
413 let _clear_response_handlers = util::defer({
414 let response_handlers = response_handlers.clone();
415 move || {
416 response_handlers.lock().take();
417 }
418 });
419 let mut buffer = Vec::new();
420 loop {
421 buffer.clear();
422
423 read_headers(&mut stdout, &mut buffer).await?;
424
425 let headers = std::str::from_utf8(&buffer)?;
426
427 let message_len = headers
428 .split('\n')
429 .find(|line| line.starts_with(CONTENT_LEN_HEADER))
430 .and_then(|line| line.strip_prefix(CONTENT_LEN_HEADER))
431 .ok_or_else(|| anyhow!("invalid LSP message header {headers:?}"))?
432 .trim_end()
433 .parse()?;
434
435 buffer.resize(message_len, 0);
436 stdout.read_exact(&mut buffer).await?;
437
438 if let Ok(message) = str::from_utf8(&buffer) {
439 log::trace!("incoming message: {message}");
440 for handler in io_handlers.lock().values_mut() {
441 handler(IoKind::StdOut, message);
442 }
443 }
444
445 if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
446 if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
447 handler(
448 msg.id,
449 msg.params.map(|params| params.get()).unwrap_or("null"),
450 cx.clone(),
451 );
452 } else {
453 on_unhandled_notification(msg);
454 }
455 } else if let Ok(AnyResponse {
456 id, error, result, ..
457 }) = serde_json::from_slice(&buffer)
458 {
459 if let Some(handler) = response_handlers
460 .lock()
461 .as_mut()
462 .and_then(|handlers| handlers.remove(&id))
463 {
464 if let Some(error) = error {
465 handler(Err(error));
466 } else if let Some(result) = result {
467 handler(Ok(result.get().into()));
468 } else {
469 handler(Ok("null".into()));
470 }
471 }
472 } else {
473 warn!(
474 "failed to deserialize LSP message:\n{}",
475 std::str::from_utf8(&buffer)?
476 );
477 }
478
479 // Don't starve the main thread when receiving lots of messages at once.
480 smol::future::yield_now().await;
481 }
482 }
483
484 async fn handle_stderr<Stderr>(
485 stderr: Stderr,
486 io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
487 stderr_capture: Arc<Mutex<Option<String>>>,
488 ) -> anyhow::Result<()>
489 where
490 Stderr: AsyncRead + Unpin + Send + 'static,
491 {
492 let mut stderr = BufReader::new(stderr);
493 let mut buffer = Vec::new();
494
495 loop {
496 buffer.clear();
497
498 let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
499 if bytes_read == 0 {
500 return Ok(());
501 }
502
503 if let Ok(message) = str::from_utf8(&buffer) {
504 log::trace!("incoming stderr message:{message}");
505 for handler in io_handlers.lock().values_mut() {
506 handler(IoKind::StdErr, message);
507 }
508
509 if let Some(stderr) = stderr_capture.lock().as_mut() {
510 stderr.push_str(message);
511 }
512 }
513
514 // Don't starve the main thread when receiving lots of messages at once.
515 smol::future::yield_now().await;
516 }
517 }
518
519 async fn handle_output<Stdin>(
520 stdin: Stdin,
521 outbound_rx: channel::Receiver<String>,
522 output_done_tx: barrier::Sender,
523 response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
524 io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
525 ) -> anyhow::Result<()>
526 where
527 Stdin: AsyncWrite + Unpin + Send + 'static,
528 {
529 let mut stdin = BufWriter::new(stdin);
530 let _clear_response_handlers = util::defer({
531 let response_handlers = response_handlers.clone();
532 move || {
533 response_handlers.lock().take();
534 }
535 });
536 let mut content_len_buffer = Vec::new();
537 while let Ok(message) = outbound_rx.recv().await {
538 log::trace!("outgoing message:{}", message);
539 for handler in io_handlers.lock().values_mut() {
540 handler(IoKind::StdIn, &message);
541 }
542
543 content_len_buffer.clear();
544 write!(content_len_buffer, "{}", message.len()).unwrap();
545 stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
546 stdin.write_all(&content_len_buffer).await?;
547 stdin.write_all("\r\n\r\n".as_bytes()).await?;
548 stdin.write_all(message.as_bytes()).await?;
549 stdin.flush().await?;
550 }
551 drop(output_done_tx);
552 Ok(())
553 }
554
555 /// Initializes a language server by sending the `Initialize` request.
556 /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
557 ///
558 /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
559 pub fn initialize(
560 mut self,
561 options: Option<Value>,
562 cx: &AppContext,
563 ) -> Task<Result<Arc<Self>>> {
564 let root_uri = Url::from_file_path(&self.root_path).unwrap();
565 #[allow(deprecated)]
566 let params = InitializeParams {
567 process_id: None,
568 root_path: None,
569 root_uri: Some(root_uri.clone()),
570 initialization_options: options,
571 capabilities: ClientCapabilities {
572 workspace: Some(WorkspaceClientCapabilities {
573 configuration: Some(true),
574 did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
575 dynamic_registration: Some(true),
576 relative_pattern_support: Some(true),
577 }),
578 did_change_configuration: Some(DynamicRegistrationClientCapabilities {
579 dynamic_registration: Some(true),
580 }),
581 workspace_folders: Some(true),
582 symbol: Some(WorkspaceSymbolClientCapabilities {
583 resolve_support: None,
584 ..WorkspaceSymbolClientCapabilities::default()
585 }),
586 inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
587 refresh_support: Some(true),
588 }),
589 diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
590 refresh_support: None,
591 }),
592 workspace_edit: Some(WorkspaceEditClientCapabilities {
593 resource_operations: Some(vec![
594 ResourceOperationKind::Create,
595 ResourceOperationKind::Rename,
596 ResourceOperationKind::Delete,
597 ]),
598 document_changes: Some(true),
599 ..WorkspaceEditClientCapabilities::default()
600 }),
601 ..Default::default()
602 }),
603 text_document: Some(TextDocumentClientCapabilities {
604 definition: Some(GotoCapability {
605 link_support: Some(true),
606 dynamic_registration: None,
607 }),
608 code_action: Some(CodeActionClientCapabilities {
609 code_action_literal_support: Some(CodeActionLiteralSupport {
610 code_action_kind: CodeActionKindLiteralSupport {
611 value_set: vec![
612 CodeActionKind::REFACTOR.as_str().into(),
613 CodeActionKind::QUICKFIX.as_str().into(),
614 CodeActionKind::SOURCE.as_str().into(),
615 ],
616 },
617 }),
618 data_support: Some(true),
619 resolve_support: Some(CodeActionCapabilityResolveSupport {
620 properties: vec![
621 "kind".to_string(),
622 "diagnostics".to_string(),
623 "isPreferred".to_string(),
624 "disabled".to_string(),
625 "edit".to_string(),
626 "command".to_string(),
627 ],
628 }),
629 ..Default::default()
630 }),
631 completion: Some(CompletionClientCapabilities {
632 completion_item: Some(CompletionItemCapability {
633 snippet_support: Some(true),
634 resolve_support: Some(CompletionItemCapabilityResolveSupport {
635 properties: vec![
636 "documentation".to_string(),
637 "additionalTextEdits".to_string(),
638 ],
639 }),
640 insert_replace_support: Some(true),
641 ..Default::default()
642 }),
643 completion_list: Some(CompletionListCapability {
644 item_defaults: Some(vec![
645 "commitCharacters".to_owned(),
646 "editRange".to_owned(),
647 "insertTextMode".to_owned(),
648 "data".to_owned(),
649 ]),
650 }),
651 ..Default::default()
652 }),
653 rename: Some(RenameClientCapabilities {
654 prepare_support: Some(true),
655 ..Default::default()
656 }),
657 hover: Some(HoverClientCapabilities {
658 content_format: Some(vec![MarkupKind::Markdown]),
659 dynamic_registration: None,
660 }),
661 inlay_hint: Some(InlayHintClientCapabilities {
662 resolve_support: Some(InlayHintResolveClientCapabilities {
663 properties: vec![
664 "textEdits".to_string(),
665 "tooltip".to_string(),
666 "label.tooltip".to_string(),
667 "label.location".to_string(),
668 "label.command".to_string(),
669 ],
670 }),
671 dynamic_registration: Some(false),
672 }),
673 publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
674 related_information: Some(true),
675 ..Default::default()
676 }),
677 formatting: Some(DynamicRegistrationClientCapabilities {
678 dynamic_registration: None,
679 }),
680 on_type_formatting: Some(DynamicRegistrationClientCapabilities {
681 dynamic_registration: None,
682 }),
683 diagnostic: Some(DiagnosticClientCapabilities {
684 related_document_support: Some(true),
685 dynamic_registration: None,
686 }),
687 ..Default::default()
688 }),
689 experimental: Some(json!({
690 "serverStatusNotification": true,
691 })),
692 window: Some(WindowClientCapabilities {
693 work_done_progress: Some(true),
694 ..Default::default()
695 }),
696 general: None,
697 },
698 trace: None,
699 workspace_folders: Some(vec![WorkspaceFolder {
700 uri: root_uri,
701 name: Default::default(),
702 }]),
703 client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
704 ClientInfo {
705 name: release_channel.display_name().to_string(),
706 version: Some(release_channel::AppVersion::global(cx).to_string()),
707 }
708 }),
709 locale: None,
710 };
711
712 cx.spawn(|_| async move {
713 let response = self.request::<request::Initialize>(params).await?;
714 if let Some(info) = response.server_info {
715 self.name = info.name.into();
716 }
717 self.capabilities = response.capabilities;
718
719 self.notify::<notification::Initialized>(InitializedParams {})?;
720 Ok(Arc::new(self))
721 })
722 }
723
724 /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
725 pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
726 if let Some(tasks) = self.io_tasks.lock().take() {
727 let response_handlers = self.response_handlers.clone();
728 let next_id = AtomicI32::new(self.next_id.load(SeqCst));
729 let outbound_tx = self.outbound_tx.clone();
730 let executor = self.executor.clone();
731 let mut output_done = self.output_done_rx.lock().take().unwrap();
732 let shutdown_request = Self::request_internal::<request::Shutdown>(
733 &next_id,
734 &response_handlers,
735 &outbound_tx,
736 &executor,
737 (),
738 );
739 let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
740 outbound_tx.close();
741
742 let server = self.server.clone();
743 let name = self.name.clone();
744 let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
745 Some(
746 async move {
747 log::debug!("language server shutdown started");
748
749 select! {
750 request_result = shutdown_request.fuse() => {
751 request_result?;
752 }
753
754 _ = timer => {
755 log::info!("timeout waiting for language server {name} to shutdown");
756 },
757 }
758
759 response_handlers.lock().take();
760 exit?;
761 output_done.recv().await;
762 server.lock().take().map(|mut child| child.kill());
763 log::debug!("language server shutdown finished");
764
765 drop(tasks);
766 anyhow::Ok(())
767 }
768 .log_err(),
769 )
770 } else {
771 None
772 }
773 }
774
775 /// Register a handler to handle incoming LSP notifications.
776 ///
777 /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
778 #[must_use]
779 pub fn on_notification<T, F>(&self, f: F) -> Subscription
780 where
781 T: notification::Notification,
782 F: 'static + Send + FnMut(T::Params, AsyncAppContext),
783 {
784 self.on_custom_notification(T::METHOD, f)
785 }
786
787 /// Register a handler to handle incoming LSP requests.
788 ///
789 /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
790 #[must_use]
791 pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
792 where
793 T: request::Request,
794 T::Params: 'static + Send,
795 F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
796 Fut: 'static + Future<Output = Result<T::Result>>,
797 {
798 self.on_custom_request(T::METHOD, f)
799 }
800
801 /// Registers a handler to inspect all language server process stdio.
802 #[must_use]
803 pub fn on_io<F>(&self, f: F) -> Subscription
804 where
805 F: 'static + Send + FnMut(IoKind, &str),
806 {
807 let id = self.next_id.fetch_add(1, SeqCst);
808 self.io_handlers.lock().insert(id, Box::new(f));
809 Subscription::Io {
810 id,
811 io_handlers: Some(Arc::downgrade(&self.io_handlers)),
812 }
813 }
814
815 /// Removes a request handler registers via [`Self::on_request`].
816 pub fn remove_request_handler<T: request::Request>(&self) {
817 self.notification_handlers.lock().remove(T::METHOD);
818 }
819
820 /// Removes a notification handler registers via [`Self::on_notification`].
821 pub fn remove_notification_handler<T: notification::Notification>(&self) {
822 self.notification_handlers.lock().remove(T::METHOD);
823 }
824
825 /// Checks if a notification handler has been registered via [`Self::on_notification`].
826 pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
827 self.notification_handlers.lock().contains_key(T::METHOD)
828 }
829
830 #[must_use]
831 fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
832 where
833 F: 'static + FnMut(Params, AsyncAppContext) + Send,
834 Params: DeserializeOwned,
835 {
836 let prev_handler = self.notification_handlers.lock().insert(
837 method,
838 Box::new(move |_, params, cx| {
839 if let Some(params) = serde_json::from_str(params).log_err() {
840 f(params, cx);
841 }
842 }),
843 );
844 assert!(
845 prev_handler.is_none(),
846 "registered multiple handlers for the same LSP method"
847 );
848 Subscription::Notification {
849 method,
850 notification_handlers: Some(self.notification_handlers.clone()),
851 }
852 }
853
854 #[must_use]
855 fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
856 where
857 F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
858 Fut: 'static + Future<Output = Result<Res>>,
859 Params: DeserializeOwned + Send + 'static,
860 Res: Serialize,
861 {
862 let outbound_tx = self.outbound_tx.clone();
863 let prev_handler = self.notification_handlers.lock().insert(
864 method,
865 Box::new(move |id, params, cx| {
866 if let Some(id) = id {
867 match serde_json::from_str(params) {
868 Ok(params) => {
869 let response = f(params, cx.clone());
870 cx.foreground_executor()
871 .spawn({
872 let outbound_tx = outbound_tx.clone();
873 async move {
874 let response = match response.await {
875 Ok(result) => Response {
876 jsonrpc: JSON_RPC_VERSION,
877 id,
878 value: LspResult::Ok(Some(result)),
879 },
880 Err(error) => Response {
881 jsonrpc: JSON_RPC_VERSION,
882 id,
883 value: LspResult::Error(Some(Error {
884 message: error.to_string(),
885 })),
886 },
887 };
888 if let Some(response) =
889 serde_json::to_string(&response).log_err()
890 {
891 outbound_tx.try_send(response).ok();
892 }
893 }
894 })
895 .detach();
896 }
897
898 Err(error) => {
899 log::error!(
900 "error deserializing {} request: {:?}, message: {:?}",
901 method,
902 error,
903 params
904 );
905 let response = AnyResponse {
906 jsonrpc: JSON_RPC_VERSION,
907 id,
908 result: None,
909 error: Some(Error {
910 message: error.to_string(),
911 }),
912 };
913 if let Some(response) = serde_json::to_string(&response).log_err() {
914 outbound_tx.try_send(response).ok();
915 }
916 }
917 }
918 }
919 }),
920 );
921 assert!(
922 prev_handler.is_none(),
923 "registered multiple handlers for the same LSP method"
924 );
925 Subscription::Notification {
926 method,
927 notification_handlers: Some(self.notification_handlers.clone()),
928 }
929 }
930
931 /// Get the name of the running language server.
932 pub fn name(&self) -> &str {
933 &self.name
934 }
935
936 /// Get the reported capabilities of the running language server.
937 pub fn capabilities(&self) -> &ServerCapabilities {
938 &self.capabilities
939 }
940
941 /// Get the id of the running language server.
942 pub fn server_id(&self) -> LanguageServerId {
943 self.server_id
944 }
945
946 /// Get the root path of the project the language server is running against.
947 pub fn root_path(&self) -> &PathBuf {
948 &self.root_path
949 }
950
951 /// Sends a RPC request to the language server.
952 ///
953 /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
954 pub fn request<T: request::Request>(
955 &self,
956 params: T::Params,
957 ) -> impl LspRequestFuture<Result<T::Result>>
958 where
959 T::Result: 'static + Send,
960 {
961 Self::request_internal::<T>(
962 &self.next_id,
963 &self.response_handlers,
964 &self.outbound_tx,
965 &self.executor,
966 params,
967 )
968 }
969
970 fn request_internal<T: request::Request>(
971 next_id: &AtomicI32,
972 response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
973 outbound_tx: &channel::Sender<String>,
974 executor: &BackgroundExecutor,
975 params: T::Params,
976 ) -> impl LspRequestFuture<Result<T::Result>>
977 where
978 T::Result: 'static + Send,
979 {
980 let id = next_id.fetch_add(1, SeqCst);
981 let message = serde_json::to_string(&Request {
982 jsonrpc: JSON_RPC_VERSION,
983 id: RequestId::Int(id),
984 method: T::METHOD,
985 params,
986 })
987 .unwrap();
988
989 let (tx, rx) = oneshot::channel();
990 let handle_response = response_handlers
991 .lock()
992 .as_mut()
993 .ok_or_else(|| anyhow!("server shut down"))
994 .map(|handlers| {
995 let executor = executor.clone();
996 handlers.insert(
997 RequestId::Int(id),
998 Box::new(move |result| {
999 executor
1000 .spawn(async move {
1001 let response = match result {
1002 Ok(response) => match serde_json::from_str(&response) {
1003 Ok(deserialized) => Ok(deserialized),
1004 Err(error) => {
1005 log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
1006 Err(error).context("failed to deserialize response")
1007 }
1008 }
1009 Err(error) => Err(anyhow!("{}", error.message)),
1010 };
1011 _ = tx.send(response);
1012 })
1013 .detach();
1014 }),
1015 );
1016 });
1017
1018 let send = outbound_tx
1019 .try_send(message)
1020 .context("failed to write to language server's stdin");
1021
1022 let outbound_tx = outbound_tx.downgrade();
1023 let mut timeout = executor.timer(LSP_REQUEST_TIMEOUT).fuse();
1024 let started = Instant::now();
1025 LspRequest::new(id, async move {
1026 handle_response?;
1027 send?;
1028
1029 let cancel_on_drop = util::defer(move || {
1030 if let Some(outbound_tx) = outbound_tx.upgrade() {
1031 Self::notify_internal::<notification::Cancel>(
1032 &outbound_tx,
1033 CancelParams {
1034 id: NumberOrString::Number(id),
1035 },
1036 )
1037 .log_err();
1038 }
1039 });
1040
1041 let method = T::METHOD;
1042 select! {
1043 response = rx.fuse() => {
1044 let elapsed = started.elapsed();
1045 log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1046 cancel_on_drop.abort();
1047 response?
1048 }
1049
1050 _ = timeout => {
1051 log::error!("Cancelled LSP request task for {method:?} id {id} which took over {LSP_REQUEST_TIMEOUT:?}");
1052 anyhow::bail!("LSP request timeout");
1053 }
1054 }
1055 })
1056 }
1057
1058 /// Sends a RPC notification to the language server.
1059 ///
1060 /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1061 pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1062 Self::notify_internal::<T>(&self.outbound_tx, params)
1063 }
1064
1065 fn notify_internal<T: notification::Notification>(
1066 outbound_tx: &channel::Sender<String>,
1067 params: T::Params,
1068 ) -> Result<()> {
1069 let message = serde_json::to_string(&Notification {
1070 jsonrpc: JSON_RPC_VERSION,
1071 method: T::METHOD,
1072 params,
1073 })
1074 .unwrap();
1075 outbound_tx.try_send(message)?;
1076 Ok(())
1077 }
1078}
1079
1080impl Drop for LanguageServer {
1081 fn drop(&mut self) {
1082 if let Some(shutdown) = self.shutdown() {
1083 self.executor.spawn(shutdown).detach();
1084 }
1085 }
1086}
1087
1088impl Subscription {
1089 /// Detaching a subscription handle prevents it from unsubscribing on drop.
1090 pub fn detach(&mut self) {
1091 match self {
1092 Subscription::Notification {
1093 notification_handlers,
1094 ..
1095 } => *notification_handlers = None,
1096 Subscription::Io { io_handlers, .. } => *io_handlers = None,
1097 }
1098 }
1099}
1100
1101impl fmt::Display for LanguageServerId {
1102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1103 self.0.fmt(f)
1104 }
1105}
1106
1107impl fmt::Debug for LanguageServer {
1108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1109 f.debug_struct("LanguageServer")
1110 .field("id", &self.server_id.0)
1111 .field("name", &self.name)
1112 .finish_non_exhaustive()
1113 }
1114}
1115
1116impl Drop for Subscription {
1117 fn drop(&mut self) {
1118 match self {
1119 Subscription::Notification {
1120 method,
1121 notification_handlers,
1122 } => {
1123 if let Some(handlers) = notification_handlers {
1124 handlers.lock().remove(method);
1125 }
1126 }
1127 Subscription::Io { id, io_handlers } => {
1128 if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1129 io_handlers.lock().remove(id);
1130 }
1131 }
1132 }
1133 }
1134}
1135
1136/// Mock language server for use in tests.
1137#[cfg(any(test, feature = "test-support"))]
1138#[derive(Clone)]
1139pub struct FakeLanguageServer {
1140 pub binary: LanguageServerBinary,
1141 pub server: Arc<LanguageServer>,
1142 notifications_rx: channel::Receiver<(String, String)>,
1143}
1144
1145#[cfg(any(test, feature = "test-support"))]
1146impl FakeLanguageServer {
1147 /// Construct a fake language server.
1148 pub fn new(
1149 server_id: LanguageServerId,
1150 binary: LanguageServerBinary,
1151 name: String,
1152 capabilities: ServerCapabilities,
1153 cx: AsyncAppContext,
1154 ) -> (LanguageServer, FakeLanguageServer) {
1155 let (stdin_writer, stdin_reader) = async_pipe::pipe();
1156 let (stdout_writer, stdout_reader) = async_pipe::pipe();
1157 let (notifications_tx, notifications_rx) = channel::unbounded();
1158
1159 let mut server = LanguageServer::new_internal(
1160 server_id,
1161 stdin_writer,
1162 stdout_reader,
1163 None::<async_pipe::PipeReader>,
1164 Arc::new(Mutex::new(None)),
1165 None,
1166 Path::new("/"),
1167 None,
1168 cx.clone(),
1169 |_| {},
1170 );
1171 server.name = name.as_str().into();
1172 let fake = FakeLanguageServer {
1173 binary,
1174 server: Arc::new({
1175 let mut server = LanguageServer::new_internal(
1176 server_id,
1177 stdout_writer,
1178 stdin_reader,
1179 None::<async_pipe::PipeReader>,
1180 Arc::new(Mutex::new(None)),
1181 None,
1182 Path::new("/"),
1183 None,
1184 cx,
1185 move |msg| {
1186 notifications_tx
1187 .try_send((
1188 msg.method.to_string(),
1189 msg.params
1190 .map(|raw_value| raw_value.get())
1191 .unwrap_or("null")
1192 .to_string(),
1193 ))
1194 .ok();
1195 },
1196 );
1197 server.name = name.as_str().into();
1198 server
1199 }),
1200 notifications_rx,
1201 };
1202 fake.handle_request::<request::Initialize, _, _>({
1203 let capabilities = capabilities;
1204 move |_, _| {
1205 let capabilities = capabilities.clone();
1206 let name = name.clone();
1207 async move {
1208 Ok(InitializeResult {
1209 capabilities,
1210 server_info: Some(ServerInfo {
1211 name,
1212 ..Default::default()
1213 }),
1214 })
1215 }
1216 }
1217 });
1218
1219 (server, fake)
1220 }
1221}
1222
1223#[cfg(any(test, feature = "test-support"))]
1224impl LanguageServer {
1225 pub fn full_capabilities() -> ServerCapabilities {
1226 ServerCapabilities {
1227 document_highlight_provider: Some(OneOf::Left(true)),
1228 code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1229 document_formatting_provider: Some(OneOf::Left(true)),
1230 document_range_formatting_provider: Some(OneOf::Left(true)),
1231 definition_provider: Some(OneOf::Left(true)),
1232 implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1233 type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1234 ..Default::default()
1235 }
1236 }
1237}
1238
1239#[cfg(any(test, feature = "test-support"))]
1240impl FakeLanguageServer {
1241 /// See [`LanguageServer::notify`].
1242 pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1243 self.server.notify::<T>(params).ok();
1244 }
1245
1246 /// See [`LanguageServer::request`].
1247 pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
1248 where
1249 T: request::Request,
1250 T::Result: 'static + Send,
1251 {
1252 self.server.executor.start_waiting();
1253 self.server.request::<T>(params).await
1254 }
1255
1256 /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1257 pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1258 self.server.executor.start_waiting();
1259 self.try_receive_notification::<T>().await.unwrap()
1260 }
1261
1262 /// Consumes the notification channel until it finds a notification for the specified type.
1263 pub async fn try_receive_notification<T: notification::Notification>(
1264 &mut self,
1265 ) -> Option<T::Params> {
1266 use futures::StreamExt as _;
1267
1268 loop {
1269 let (method, params) = self.notifications_rx.next().await?;
1270 if method == T::METHOD {
1271 return Some(serde_json::from_str::<T::Params>(¶ms).unwrap());
1272 } else {
1273 log::info!("skipping message in fake language server {:?}", params);
1274 }
1275 }
1276 }
1277
1278 /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1279 pub fn handle_request<T, F, Fut>(
1280 &self,
1281 mut handler: F,
1282 ) -> futures::channel::mpsc::UnboundedReceiver<()>
1283 where
1284 T: 'static + request::Request,
1285 T::Params: 'static + Send,
1286 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
1287 Fut: 'static + Send + Future<Output = Result<T::Result>>,
1288 {
1289 let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1290 self.server.remove_request_handler::<T>();
1291 self.server
1292 .on_request::<T, _, _>(move |params, cx| {
1293 let result = handler(params, cx.clone());
1294 let responded_tx = responded_tx.clone();
1295 let executor = cx.background_executor().clone();
1296 async move {
1297 executor.simulate_random_delay().await;
1298 let result = result.await;
1299 responded_tx.unbounded_send(()).ok();
1300 result
1301 }
1302 })
1303 .detach();
1304 responded_rx
1305 }
1306
1307 /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1308 pub fn handle_notification<T, F>(
1309 &self,
1310 mut handler: F,
1311 ) -> futures::channel::mpsc::UnboundedReceiver<()>
1312 where
1313 T: 'static + notification::Notification,
1314 T::Params: 'static + Send,
1315 F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
1316 {
1317 let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1318 self.server.remove_notification_handler::<T>();
1319 self.server
1320 .on_notification::<T, _>(move |params, cx| {
1321 handler(params, cx.clone());
1322 handled_tx.unbounded_send(()).ok();
1323 })
1324 .detach();
1325 handled_rx
1326 }
1327
1328 /// Removes any existing handler for specified notification type.
1329 pub fn remove_request_handler<T>(&mut self)
1330 where
1331 T: 'static + request::Request,
1332 {
1333 self.server.remove_request_handler::<T>();
1334 }
1335
1336 /// Simulate that the server has started work and notifies about its progress with the specified token.
1337 pub async fn start_progress(&self, token: impl Into<String>) {
1338 let token = token.into();
1339 self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1340 token: NumberOrString::String(token.clone()),
1341 })
1342 .await
1343 .unwrap();
1344 self.notify::<notification::Progress>(ProgressParams {
1345 token: NumberOrString::String(token),
1346 value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
1347 });
1348 }
1349
1350 /// Simulate that the server has completed work and notifies about that with the specified token.
1351 pub fn end_progress(&self, token: impl Into<String>) {
1352 self.notify::<notification::Progress>(ProgressParams {
1353 token: NumberOrString::String(token.into()),
1354 value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1355 });
1356 }
1357}
1358
1359pub(self) async fn read_headers<Stdout>(
1360 reader: &mut BufReader<Stdout>,
1361 buffer: &mut Vec<u8>,
1362) -> Result<()>
1363where
1364 Stdout: AsyncRead + Unpin + Send + 'static,
1365{
1366 loop {
1367 if buffer.len() >= HEADER_DELIMITER.len()
1368 && buffer[(buffer.len() - HEADER_DELIMITER.len())..] == HEADER_DELIMITER[..]
1369 {
1370 return Ok(());
1371 }
1372
1373 if reader.read_until(b'\n', buffer).await? == 0 {
1374 return Err(anyhow!("cannot read LSP message headers"));
1375 }
1376 }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use super::*;
1382 use gpui::TestAppContext;
1383
1384 #[ctor::ctor]
1385 fn init_logger() {
1386 if std::env::var("RUST_LOG").is_ok() {
1387 env_logger::init();
1388 }
1389 }
1390
1391 #[gpui::test]
1392 async fn test_fake(cx: &mut TestAppContext) {
1393 cx.update(|cx| {
1394 release_channel::init("0.0.0", cx);
1395 });
1396 let (server, mut fake) = FakeLanguageServer::new(
1397 LanguageServerId(0),
1398 LanguageServerBinary {
1399 path: "path/to/language-server".into(),
1400 arguments: vec![],
1401 env: None,
1402 },
1403 "the-lsp".to_string(),
1404 Default::default(),
1405 cx.to_async(),
1406 );
1407
1408 let (message_tx, message_rx) = channel::unbounded();
1409 let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1410 server
1411 .on_notification::<notification::ShowMessage, _>(move |params, _| {
1412 message_tx.try_send(params).unwrap()
1413 })
1414 .detach();
1415 server
1416 .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1417 diagnostics_tx.try_send(params).unwrap()
1418 })
1419 .detach();
1420
1421 let server = cx.update(|cx| server.initialize(None, cx)).await.unwrap();
1422 server
1423 .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1424 text_document: TextDocumentItem::new(
1425 Url::from_str("file://a/b").unwrap(),
1426 "rust".to_string(),
1427 0,
1428 "".to_string(),
1429 ),
1430 })
1431 .unwrap();
1432 assert_eq!(
1433 fake.receive_notification::<notification::DidOpenTextDocument>()
1434 .await
1435 .text_document
1436 .uri
1437 .as_str(),
1438 "file://a/b"
1439 );
1440
1441 fake.notify::<notification::ShowMessage>(ShowMessageParams {
1442 typ: MessageType::ERROR,
1443 message: "ok".to_string(),
1444 });
1445 fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1446 uri: Url::from_str("file://b/c").unwrap(),
1447 version: Some(5),
1448 diagnostics: vec![],
1449 });
1450 assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1451 assert_eq!(
1452 diagnostics_rx.recv().await.unwrap().uri.as_str(),
1453 "file://b/c"
1454 );
1455
1456 fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1457
1458 drop(server);
1459 fake.receive_notification::<notification::Exit>().await;
1460 }
1461
1462 #[gpui::test]
1463 async fn test_read_headers() {
1464 let mut buf = Vec::new();
1465 let mut reader = smol::io::BufReader::new(b"Content-Length: 123\r\n\r\n" as &[u8]);
1466 read_headers(&mut reader, &mut buf).await.unwrap();
1467 assert_eq!(buf, b"Content-Length: 123\r\n\r\n");
1468
1469 let mut buf = Vec::new();
1470 let mut reader = smol::io::BufReader::new(b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n{\"somecontent\":123}" as &[u8]);
1471 read_headers(&mut reader, &mut buf).await.unwrap();
1472 assert_eq!(
1473 buf,
1474 b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n"
1475 );
1476
1477 let mut buf = Vec::new();
1478 let mut reader = smol::io::BufReader::new(b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n{\"somecontent\":true}" as &[u8]);
1479 read_headers(&mut reader, &mut buf).await.unwrap();
1480 assert_eq!(
1481 buf,
1482 b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n"
1483 );
1484 }
1485
1486 #[gpui::test]
1487 fn test_deserialize_string_digit_id() {
1488 let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1489 let notification = serde_json::from_str::<AnyNotification>(json)
1490 .expect("message with string id should be parsed");
1491 let expected_id = RequestId::Str("2".to_string());
1492 assert_eq!(notification.id, Some(expected_id));
1493 }
1494
1495 #[gpui::test]
1496 fn test_deserialize_string_id() {
1497 let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1498 let notification = serde_json::from_str::<AnyNotification>(json)
1499 .expect("message with string id should be parsed");
1500 let expected_id = RequestId::Str("anythingAtAll".to_string());
1501 assert_eq!(notification.id, Some(expected_id));
1502 }
1503
1504 #[gpui::test]
1505 fn test_deserialize_int_id() {
1506 let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1507 let notification = serde_json::from_str::<AnyNotification>(json)
1508 .expect("message with string id should be parsed");
1509 let expected_id = RequestId::Int(2);
1510 assert_eq!(notification.id, Some(expected_id));
1511 }
1512
1513 #[test]
1514 fn test_serialize_has_no_nulls() {
1515 // Ensure we're not setting both result and error variants. (ticket #10595)
1516 let no_tag = Response::<u32> {
1517 jsonrpc: "",
1518 id: RequestId::Int(0),
1519 value: LspResult::Ok(None),
1520 };
1521 assert_eq!(
1522 serde_json::to_string(&no_tag).unwrap(),
1523 "{\"jsonrpc\":\"\",\"id\":0,\"result\":null}"
1524 );
1525 let no_tag = Response::<u32> {
1526 jsonrpc: "",
1527 id: RequestId::Int(0),
1528 value: LspResult::Error(None),
1529 };
1530 assert_eq!(
1531 serde_json::to_string(&no_tag).unwrap(),
1532 "{\"jsonrpc\":\"\",\"id\":0,\"error\":null}"
1533 );
1534 }
1535}