lsp.rs

   1use log::warn;
   2pub use lsp_types::request::*;
   3pub use lsp_types::*;
   4
   5use anyhow::{anyhow, Context, Result};
   6use collections::HashMap;
   7use futures::{channel::oneshot, io::BufWriter, select, AsyncRead, AsyncWrite, FutureExt};
   8use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task};
   9use parking_lot::Mutex;
  10use postage::{barrier, prelude::Stream};
  11use serde::{de::DeserializeOwned, Deserialize, Serialize};
  12use serde_json::{json, value::RawValue, Value};
  13use smol::{
  14    channel,
  15    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
  16    process::{self, Child},
  17};
  18use std::{
  19    ffi::OsString,
  20    fmt,
  21    future::Future,
  22    io::Write,
  23    path::PathBuf,
  24    str::{self, FromStr as _},
  25    sync::{
  26        atomic::{AtomicI32, Ordering::SeqCst},
  27        Arc, Weak,
  28    },
  29    time::{Duration, Instant},
  30};
  31use std::{path::Path, process::Stdio};
  32use util::{ResultExt, TryFutureExt};
  33
  34const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n";
  35const JSON_RPC_VERSION: &str = "2.0";
  36const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  37const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
  38const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  39
  40type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, &str, AsyncAppContext)>;
  41type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
  42type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  43
  44/// Kind of language server stdio given to an IO handler.
  45#[derive(Debug, Clone, Copy)]
  46pub enum IoKind {
  47    StdOut,
  48    StdIn,
  49    StdErr,
  50}
  51
  52/// Represents a launchable language server. This can either be a standalone binary or the path
  53/// to a runtime with arguments to instruct it to launch the actual language server file.
  54#[derive(Debug, Clone, Deserialize)]
  55pub struct LanguageServerBinary {
  56    pub path: PathBuf,
  57    pub arguments: Vec<OsString>,
  58}
  59
  60/// A running language server process.
  61pub struct LanguageServer {
  62    server_id: LanguageServerId,
  63    next_id: AtomicI32,
  64    outbound_tx: channel::Sender<String>,
  65    name: Arc<str>,
  66    capabilities: ServerCapabilities,
  67    code_action_kinds: Option<Vec<CodeActionKind>>,
  68    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
  69    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
  70    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
  71    executor: BackgroundExecutor,
  72    #[allow(clippy::type_complexity)]
  73    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
  74    output_done_rx: Mutex<Option<barrier::Receiver>>,
  75    root_path: PathBuf,
  76    server: Arc<Mutex<Option<Child>>>,
  77}
  78
  79/// Identifies a running language server.
  80#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
  81#[repr(transparent)]
  82pub struct LanguageServerId(pub usize);
  83
  84/// Handle to a language server RPC activity subscription.
  85pub enum Subscription {
  86    Notification {
  87        method: &'static str,
  88        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
  89    },
  90    Io {
  91        id: i32,
  92        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
  93    },
  94}
  95
  96/// Language server protocol RPC request message ID.
  97///
  98/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
  99#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 100#[serde(untagged)]
 101pub enum RequestId {
 102    Int(i32),
 103    Str(String),
 104}
 105
 106/// Language server protocol RPC request message.
 107///
 108/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 109#[derive(Serialize, Deserialize)]
 110pub struct Request<'a, T> {
 111    jsonrpc: &'static str,
 112    id: RequestId,
 113    method: &'a str,
 114    params: T,
 115}
 116
 117/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 118#[derive(Serialize, Deserialize)]
 119struct AnyResponse<'a> {
 120    jsonrpc: &'a str,
 121    id: RequestId,
 122    #[serde(default)]
 123    error: Option<Error>,
 124    #[serde(borrow)]
 125    result: Option<&'a RawValue>,
 126}
 127
 128/// Language server protocol RPC request response message.
 129///
 130/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 131#[derive(Serialize)]
 132struct Response<T> {
 133    jsonrpc: &'static str,
 134    id: RequestId,
 135    result: Option<T>,
 136    error: Option<Error>,
 137}
 138
 139/// Language server protocol RPC notification message.
 140///
 141/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 142#[derive(Serialize, Deserialize)]
 143struct Notification<'a, T> {
 144    jsonrpc: &'static str,
 145    #[serde(borrow)]
 146    method: &'a str,
 147    params: T,
 148}
 149
 150/// Language server RPC notification message before it is deserialized into a concrete type.
 151#[derive(Debug, Clone, Deserialize)]
 152struct AnyNotification<'a> {
 153    #[serde(default)]
 154    id: Option<RequestId>,
 155    #[serde(borrow)]
 156    method: &'a str,
 157    #[serde(borrow, default)]
 158    params: Option<&'a RawValue>,
 159}
 160
 161#[derive(Debug, Serialize, Deserialize)]
 162struct Error {
 163    message: String,
 164}
 165
 166impl LanguageServer {
 167    /// Starts a language server process.
 168    pub fn new(
 169        stderr_capture: Arc<Mutex<Option<String>>>,
 170        server_id: LanguageServerId,
 171        binary: LanguageServerBinary,
 172        root_path: &Path,
 173        code_action_kinds: Option<Vec<CodeActionKind>>,
 174        cx: AsyncAppContext,
 175    ) -> Result<Self> {
 176        let working_dir = if root_path.is_dir() {
 177            root_path
 178        } else {
 179            root_path.parent().unwrap_or_else(|| Path::new("/"))
 180        };
 181
 182        log::info!(
 183            "starting language server. binary path: {:?}, working directory: {:?}, args: {:?}",
 184            binary.path,
 185            working_dir,
 186            &binary.arguments
 187        );
 188
 189        let mut server = process::Command::new(&binary.path)
 190            .current_dir(working_dir)
 191            .args(binary.arguments)
 192            .stdin(Stdio::piped())
 193            .stdout(Stdio::piped())
 194            .stderr(Stdio::piped())
 195            .kill_on_drop(true)
 196            .spawn()?;
 197
 198        let stdin = server.stdin.take().unwrap();
 199        let stdout = server.stdout.take().unwrap();
 200        let stderr = server.stderr.take().unwrap();
 201        let mut server = Self::new_internal(
 202            server_id.clone(),
 203            stdin,
 204            stdout,
 205            Some(stderr),
 206            stderr_capture,
 207            Some(server),
 208            root_path,
 209            code_action_kinds,
 210            cx,
 211            move |notification| {
 212                log::info!(
 213                    "Language server with id {} sent unhandled notification {}:\n{}",
 214                    server_id,
 215                    notification.method,
 216                    serde_json::to_string_pretty(
 217                        &notification
 218                            .params
 219                            .and_then(|params| Value::from_str(params.get()).ok())
 220                            .unwrap_or(Value::Null)
 221                    )
 222                    .unwrap(),
 223                );
 224            },
 225        );
 226
 227        if let Some(name) = binary.path.file_name() {
 228            server.name = name.to_string_lossy().into();
 229        }
 230
 231        Ok(server)
 232    }
 233
 234    fn new_internal<Stdin, Stdout, Stderr, F>(
 235        server_id: LanguageServerId,
 236        stdin: Stdin,
 237        stdout: Stdout,
 238        stderr: Option<Stderr>,
 239        stderr_capture: Arc<Mutex<Option<String>>>,
 240        server: Option<Child>,
 241        root_path: &Path,
 242        code_action_kinds: Option<Vec<CodeActionKind>>,
 243        cx: AsyncAppContext,
 244        on_unhandled_notification: F,
 245    ) -> Self
 246    where
 247        Stdin: AsyncWrite + Unpin + Send + 'static,
 248        Stdout: AsyncRead + Unpin + Send + 'static,
 249        Stderr: AsyncRead + Unpin + Send + 'static,
 250        F: FnMut(AnyNotification) + 'static + Send + Sync + Clone,
 251    {
 252        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 253        let (output_done_tx, output_done_rx) = barrier::channel();
 254        let notification_handlers =
 255            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 256        let response_handlers =
 257            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 258        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 259
 260        let stdout_input_task = cx.spawn({
 261            let on_unhandled_notification = on_unhandled_notification.clone();
 262            let notification_handlers = notification_handlers.clone();
 263            let response_handlers = response_handlers.clone();
 264            let io_handlers = io_handlers.clone();
 265            move |cx| {
 266                Self::handle_input(
 267                    stdout,
 268                    on_unhandled_notification,
 269                    notification_handlers,
 270                    response_handlers,
 271                    io_handlers,
 272                    cx,
 273                )
 274                .log_err()
 275            }
 276        });
 277        let stderr_input_task = stderr
 278            .map(|stderr| {
 279                let io_handlers = io_handlers.clone();
 280                let stderr_captures = stderr_capture.clone();
 281                cx.spawn(|_| Self::handle_stderr(stderr, io_handlers, stderr_captures).log_err())
 282            })
 283            .unwrap_or_else(|| Task::Ready(Some(None)));
 284        let input_task = cx.spawn(|_| async move {
 285            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 286            stdout.or(stderr)
 287        });
 288        let output_task = cx.background_executor().spawn({
 289            Self::handle_output(
 290                stdin,
 291                outbound_rx,
 292                output_done_tx,
 293                response_handlers.clone(),
 294                io_handlers.clone(),
 295            )
 296            .log_err()
 297        });
 298
 299        Self {
 300            server_id,
 301            notification_handlers,
 302            response_handlers,
 303            io_handlers,
 304            name: "".into(),
 305            capabilities: Default::default(),
 306            code_action_kinds,
 307            next_id: Default::default(),
 308            outbound_tx,
 309            executor: cx.background_executor().clone(),
 310            io_tasks: Mutex::new(Some((input_task, output_task))),
 311            output_done_rx: Mutex::new(Some(output_done_rx)),
 312            root_path: root_path.to_path_buf(),
 313            server: Arc::new(Mutex::new(server.map(|server| server))),
 314        }
 315    }
 316
 317    /// List of code action kinds this language server reports being able to emit.
 318    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 319        self.code_action_kinds.clone()
 320    }
 321
 322    async fn handle_input<Stdout, F>(
 323        stdout: Stdout,
 324        mut on_unhandled_notification: F,
 325        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 326        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 327        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 328        cx: AsyncAppContext,
 329    ) -> anyhow::Result<()>
 330    where
 331        Stdout: AsyncRead + Unpin + Send + 'static,
 332        F: FnMut(AnyNotification) + 'static + Send,
 333    {
 334        let mut stdout = BufReader::new(stdout);
 335        let _clear_response_handlers = util::defer({
 336            let response_handlers = response_handlers.clone();
 337            move || {
 338                response_handlers.lock().take();
 339            }
 340        });
 341        let mut buffer = Vec::new();
 342        loop {
 343            buffer.clear();
 344
 345            read_headers(&mut stdout, &mut buffer).await?;
 346
 347            let headers = std::str::from_utf8(&buffer)?;
 348
 349            let message_len = headers
 350                .split("\n")
 351                .find(|line| line.starts_with(CONTENT_LEN_HEADER))
 352                .and_then(|line| line.strip_prefix(CONTENT_LEN_HEADER))
 353                .ok_or_else(|| anyhow!("invalid LSP message header {headers:?}"))?
 354                .trim_end()
 355                .parse()?;
 356
 357            buffer.resize(message_len, 0);
 358            stdout.read_exact(&mut buffer).await?;
 359
 360            if let Ok(message) = str::from_utf8(&buffer) {
 361                log::trace!("incoming message: {message}");
 362                for handler in io_handlers.lock().values_mut() {
 363                    handler(IoKind::StdOut, message);
 364                }
 365            }
 366
 367            if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
 368                if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
 369                    handler(
 370                        msg.id,
 371                        msg.params.map(|params| params.get()).unwrap_or("null"),
 372                        cx.clone(),
 373                    );
 374                } else {
 375                    on_unhandled_notification(msg);
 376                }
 377            } else if let Ok(AnyResponse {
 378                id, error, result, ..
 379            }) = serde_json::from_slice(&buffer)
 380            {
 381                if let Some(handler) = response_handlers
 382                    .lock()
 383                    .as_mut()
 384                    .and_then(|handlers| handlers.remove(&id))
 385                {
 386                    if let Some(error) = error {
 387                        handler(Err(error));
 388                    } else if let Some(result) = result {
 389                        handler(Ok(result.get().into()));
 390                    } else {
 391                        handler(Ok("null".into()));
 392                    }
 393                }
 394            } else {
 395                warn!(
 396                    "failed to deserialize LSP message:\n{}",
 397                    std::str::from_utf8(&buffer)?
 398                );
 399            }
 400
 401            // Don't starve the main thread when receiving lots of messages at once.
 402            smol::future::yield_now().await;
 403        }
 404    }
 405
 406    async fn handle_stderr<Stderr>(
 407        stderr: Stderr,
 408        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 409        stderr_capture: Arc<Mutex<Option<String>>>,
 410    ) -> anyhow::Result<()>
 411    where
 412        Stderr: AsyncRead + Unpin + Send + 'static,
 413    {
 414        let mut stderr = BufReader::new(stderr);
 415        let mut buffer = Vec::new();
 416
 417        loop {
 418            buffer.clear();
 419
 420            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 421            if bytes_read == 0 {
 422                return Ok(());
 423            }
 424
 425            if let Ok(message) = str::from_utf8(&buffer) {
 426                log::trace!("incoming stderr message:{message}");
 427                for handler in io_handlers.lock().values_mut() {
 428                    handler(IoKind::StdErr, message);
 429                }
 430
 431                if let Some(stderr) = stderr_capture.lock().as_mut() {
 432                    stderr.push_str(message);
 433                }
 434            }
 435
 436            // Don't starve the main thread when receiving lots of messages at once.
 437            smol::future::yield_now().await;
 438        }
 439    }
 440
 441    async fn handle_output<Stdin>(
 442        stdin: Stdin,
 443        outbound_rx: channel::Receiver<String>,
 444        output_done_tx: barrier::Sender,
 445        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 446        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 447    ) -> anyhow::Result<()>
 448    where
 449        Stdin: AsyncWrite + Unpin + Send + 'static,
 450    {
 451        let mut stdin = BufWriter::new(stdin);
 452        let _clear_response_handlers = util::defer({
 453            let response_handlers = response_handlers.clone();
 454            move || {
 455                response_handlers.lock().take();
 456            }
 457        });
 458        let mut content_len_buffer = Vec::new();
 459        while let Ok(message) = outbound_rx.recv().await {
 460            log::trace!("outgoing message:{}", message);
 461            for handler in io_handlers.lock().values_mut() {
 462                handler(IoKind::StdIn, &message);
 463            }
 464
 465            content_len_buffer.clear();
 466            write!(content_len_buffer, "{}", message.len()).unwrap();
 467            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 468            stdin.write_all(&content_len_buffer).await?;
 469            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 470            stdin.write_all(message.as_bytes()).await?;
 471            stdin.flush().await?;
 472        }
 473        drop(output_done_tx);
 474        Ok(())
 475    }
 476
 477    /// Initializes a language server by sending the `Initialize` request.
 478    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
 479    ///
 480    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
 481    pub fn initialize(
 482        mut self,
 483        options: Option<Value>,
 484        cx: &AppContext,
 485    ) -> Task<Result<Arc<Self>>> {
 486        let root_uri = Url::from_file_path(&self.root_path).unwrap();
 487        #[allow(deprecated)]
 488        let params = InitializeParams {
 489            process_id: None,
 490            root_path: None,
 491            root_uri: Some(root_uri.clone()),
 492            initialization_options: options,
 493            capabilities: ClientCapabilities {
 494                workspace: Some(WorkspaceClientCapabilities {
 495                    configuration: Some(true),
 496                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 497                        dynamic_registration: Some(true),
 498                        relative_pattern_support: Some(true),
 499                    }),
 500                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 501                        dynamic_registration: Some(true),
 502                    }),
 503                    workspace_folders: Some(true),
 504                    symbol: Some(WorkspaceSymbolClientCapabilities {
 505                        resolve_support: None,
 506                        ..WorkspaceSymbolClientCapabilities::default()
 507                    }),
 508                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 509                        refresh_support: Some(true),
 510                    }),
 511                    diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
 512                        refresh_support: None,
 513                    }),
 514                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 515                        resource_operations: Some(vec![
 516                            ResourceOperationKind::Create,
 517                            ResourceOperationKind::Rename,
 518                            ResourceOperationKind::Delete,
 519                        ]),
 520                        document_changes: Some(true),
 521                        ..WorkspaceEditClientCapabilities::default()
 522                    }),
 523                    ..Default::default()
 524                }),
 525                text_document: Some(TextDocumentClientCapabilities {
 526                    definition: Some(GotoCapability {
 527                        link_support: Some(true),
 528                        dynamic_registration: None,
 529                    }),
 530                    code_action: Some(CodeActionClientCapabilities {
 531                        code_action_literal_support: Some(CodeActionLiteralSupport {
 532                            code_action_kind: CodeActionKindLiteralSupport {
 533                                value_set: vec![
 534                                    CodeActionKind::REFACTOR.as_str().into(),
 535                                    CodeActionKind::QUICKFIX.as_str().into(),
 536                                    CodeActionKind::SOURCE.as_str().into(),
 537                                ],
 538                            },
 539                        }),
 540                        data_support: Some(true),
 541                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 542                            properties: vec!["edit".to_string(), "command".to_string()],
 543                        }),
 544                        ..Default::default()
 545                    }),
 546                    completion: Some(CompletionClientCapabilities {
 547                        completion_item: Some(CompletionItemCapability {
 548                            snippet_support: Some(true),
 549                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 550                                properties: vec![
 551                                    "documentation".to_string(),
 552                                    "additionalTextEdits".to_string(),
 553                                ],
 554                            }),
 555                            ..Default::default()
 556                        }),
 557                        completion_list: Some(CompletionListCapability {
 558                            item_defaults: Some(vec![
 559                                "commitCharacters".to_owned(),
 560                                "editRange".to_owned(),
 561                                "insertTextMode".to_owned(),
 562                                "data".to_owned(),
 563                            ]),
 564                        }),
 565                        ..Default::default()
 566                    }),
 567                    rename: Some(RenameClientCapabilities {
 568                        prepare_support: Some(true),
 569                        ..Default::default()
 570                    }),
 571                    hover: Some(HoverClientCapabilities {
 572                        content_format: Some(vec![MarkupKind::Markdown]),
 573                        dynamic_registration: None,
 574                    }),
 575                    inlay_hint: Some(InlayHintClientCapabilities {
 576                        resolve_support: Some(InlayHintResolveClientCapabilities {
 577                            properties: vec![
 578                                "textEdits".to_string(),
 579                                "tooltip".to_string(),
 580                                "label.tooltip".to_string(),
 581                                "label.location".to_string(),
 582                                "label.command".to_string(),
 583                            ],
 584                        }),
 585                        dynamic_registration: Some(false),
 586                    }),
 587                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 588                        related_information: Some(true),
 589                        ..Default::default()
 590                    }),
 591                    formatting: Some(DynamicRegistrationClientCapabilities {
 592                        dynamic_registration: None,
 593                    }),
 594                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 595                        dynamic_registration: None,
 596                    }),
 597                    diagnostic: Some(DiagnosticClientCapabilities {
 598                        related_document_support: Some(true),
 599                        dynamic_registration: None,
 600                    }),
 601                    ..Default::default()
 602                }),
 603                experimental: Some(json!({
 604                    "serverStatusNotification": true,
 605                })),
 606                window: Some(WindowClientCapabilities {
 607                    work_done_progress: Some(true),
 608                    ..Default::default()
 609                }),
 610                general: None,
 611            },
 612            trace: None,
 613            workspace_folders: Some(vec![WorkspaceFolder {
 614                uri: root_uri,
 615                name: Default::default(),
 616            }]),
 617            client_info: Some(ClientInfo {
 618                name: release_channel::ReleaseChannel::global(cx)
 619                    .display_name()
 620                    .to_string(),
 621                version: Some(release_channel::AppVersion::global(cx).to_string()),
 622            }),
 623            locale: None,
 624        };
 625
 626        cx.spawn(|_| async move {
 627            let response = self.request::<request::Initialize>(params).await?;
 628            if let Some(info) = response.server_info {
 629                self.name = info.name.into();
 630            }
 631            self.capabilities = response.capabilities;
 632
 633            self.notify::<notification::Initialized>(InitializedParams {})?;
 634            Ok(Arc::new(self))
 635        })
 636    }
 637
 638    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
 639    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
 640        if let Some(tasks) = self.io_tasks.lock().take() {
 641            let response_handlers = self.response_handlers.clone();
 642            let next_id = AtomicI32::new(self.next_id.load(SeqCst));
 643            let outbound_tx = self.outbound_tx.clone();
 644            let executor = self.executor.clone();
 645            let mut output_done = self.output_done_rx.lock().take().unwrap();
 646            let shutdown_request = Self::request_internal::<request::Shutdown>(
 647                &next_id,
 648                &response_handlers,
 649                &outbound_tx,
 650                &executor,
 651                (),
 652            );
 653            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
 654            outbound_tx.close();
 655
 656            let server = self.server.clone();
 657            let name = self.name.clone();
 658            let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
 659            Some(
 660                async move {
 661                    log::debug!("language server shutdown started");
 662
 663                    select! {
 664                        request_result = shutdown_request.fuse() => {
 665                            request_result?;
 666                        }
 667
 668                        _ = timer => {
 669                            log::info!("timeout waiting for language server {name} to shutdown");
 670                        },
 671                    }
 672
 673                    response_handlers.lock().take();
 674                    exit?;
 675                    output_done.recv().await;
 676                    server.lock().take().map(|mut child| child.kill());
 677                    log::debug!("language server shutdown finished");
 678
 679                    drop(tasks);
 680                    anyhow::Ok(())
 681                }
 682                .log_err(),
 683            )
 684        } else {
 685            None
 686        }
 687    }
 688
 689    /// Register a handler to handle incoming LSP notifications.
 690    ///
 691    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 692    #[must_use]
 693    pub fn on_notification<T, F>(&self, f: F) -> Subscription
 694    where
 695        T: notification::Notification,
 696        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
 697    {
 698        self.on_custom_notification(T::METHOD, f)
 699    }
 700
 701    /// Register a handler to handle incoming LSP requests.
 702    ///
 703    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 704    #[must_use]
 705    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
 706    where
 707        T: request::Request,
 708        T::Params: 'static + Send,
 709        F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
 710        Fut: 'static + Future<Output = Result<T::Result>>,
 711    {
 712        self.on_custom_request(T::METHOD, f)
 713    }
 714
 715    /// Registers a handler to inspect all language server process stdio.
 716    #[must_use]
 717    pub fn on_io<F>(&self, f: F) -> Subscription
 718    where
 719        F: 'static + Send + FnMut(IoKind, &str),
 720    {
 721        let id = self.next_id.fetch_add(1, SeqCst);
 722        self.io_handlers.lock().insert(id, Box::new(f));
 723        Subscription::Io {
 724            id,
 725            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
 726        }
 727    }
 728
 729    /// Removes a request handler registers via [`Self::on_request`].
 730    pub fn remove_request_handler<T: request::Request>(&self) {
 731        self.notification_handlers.lock().remove(T::METHOD);
 732    }
 733
 734    /// Removes a notification handler registers via [`Self::on_notification`].
 735    pub fn remove_notification_handler<T: notification::Notification>(&self) {
 736        self.notification_handlers.lock().remove(T::METHOD);
 737    }
 738
 739    /// Checks if a notification handler has been registered via [`Self::on_notification`].
 740    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
 741        self.notification_handlers.lock().contains_key(T::METHOD)
 742    }
 743
 744    #[must_use]
 745    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
 746    where
 747        F: 'static + FnMut(Params, AsyncAppContext) + Send,
 748        Params: DeserializeOwned,
 749    {
 750        let prev_handler = self.notification_handlers.lock().insert(
 751            method,
 752            Box::new(move |_, params, cx| {
 753                if let Some(params) = serde_json::from_str(params).log_err() {
 754                    f(params, cx);
 755                }
 756            }),
 757        );
 758        assert!(
 759            prev_handler.is_none(),
 760            "registered multiple handlers for the same LSP method"
 761        );
 762        Subscription::Notification {
 763            method,
 764            notification_handlers: Some(self.notification_handlers.clone()),
 765        }
 766    }
 767
 768    #[must_use]
 769    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
 770    where
 771        F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
 772        Fut: 'static + Future<Output = Result<Res>>,
 773        Params: DeserializeOwned + Send + 'static,
 774        Res: Serialize,
 775    {
 776        let outbound_tx = self.outbound_tx.clone();
 777        let prev_handler = self.notification_handlers.lock().insert(
 778            method,
 779            Box::new(move |id, params, cx| {
 780                if let Some(id) = id {
 781                    match serde_json::from_str(params) {
 782                        Ok(params) => {
 783                            let response = f(params, cx.clone());
 784                            cx.foreground_executor()
 785                                .spawn({
 786                                    let outbound_tx = outbound_tx.clone();
 787                                    async move {
 788                                        let response = match response.await {
 789                                            Ok(result) => Response {
 790                                                jsonrpc: JSON_RPC_VERSION,
 791                                                id,
 792                                                result: Some(result),
 793                                                error: None,
 794                                            },
 795                                            Err(error) => Response {
 796                                                jsonrpc: JSON_RPC_VERSION,
 797                                                id,
 798                                                result: None,
 799                                                error: Some(Error {
 800                                                    message: error.to_string(),
 801                                                }),
 802                                            },
 803                                        };
 804                                        if let Some(response) =
 805                                            serde_json::to_string(&response).log_err()
 806                                        {
 807                                            outbound_tx.try_send(response).ok();
 808                                        }
 809                                    }
 810                                })
 811                                .detach();
 812                        }
 813
 814                        Err(error) => {
 815                            log::error!(
 816                                "error deserializing {} request: {:?}, message: {:?}",
 817                                method,
 818                                error,
 819                                params
 820                            );
 821                            let response = AnyResponse {
 822                                jsonrpc: JSON_RPC_VERSION,
 823                                id,
 824                                result: None,
 825                                error: Some(Error {
 826                                    message: error.to_string(),
 827                                }),
 828                            };
 829                            if let Some(response) = serde_json::to_string(&response).log_err() {
 830                                outbound_tx.try_send(response).ok();
 831                            }
 832                        }
 833                    }
 834                }
 835            }),
 836        );
 837        assert!(
 838            prev_handler.is_none(),
 839            "registered multiple handlers for the same LSP method"
 840        );
 841        Subscription::Notification {
 842            method,
 843            notification_handlers: Some(self.notification_handlers.clone()),
 844        }
 845    }
 846
 847    /// Get the name of the running language server.
 848    pub fn name(&self) -> &str {
 849        &self.name
 850    }
 851
 852    /// Get the reported capabilities of the running language server.
 853    pub fn capabilities(&self) -> &ServerCapabilities {
 854        &self.capabilities
 855    }
 856
 857    /// Get the id of the running language server.
 858    pub fn server_id(&self) -> LanguageServerId {
 859        self.server_id
 860    }
 861
 862    /// Get the root path of the project the language server is running against.
 863    pub fn root_path(&self) -> &PathBuf {
 864        &self.root_path
 865    }
 866
 867    /// Sends a RPC request to the language server.
 868    ///
 869    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 870    pub fn request<T: request::Request>(
 871        &self,
 872        params: T::Params,
 873    ) -> impl Future<Output = Result<T::Result>>
 874    where
 875        T::Result: 'static + Send,
 876    {
 877        Self::request_internal::<T>(
 878            &self.next_id,
 879            &self.response_handlers,
 880            &self.outbound_tx,
 881            &self.executor,
 882            params,
 883        )
 884    }
 885
 886    fn request_internal<T: request::Request>(
 887        next_id: &AtomicI32,
 888        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
 889        outbound_tx: &channel::Sender<String>,
 890        executor: &BackgroundExecutor,
 891        params: T::Params,
 892    ) -> impl 'static + Future<Output = anyhow::Result<T::Result>>
 893    where
 894        T::Result: 'static + Send,
 895    {
 896        let id = next_id.fetch_add(1, SeqCst);
 897        let message = serde_json::to_string(&Request {
 898            jsonrpc: JSON_RPC_VERSION,
 899            id: RequestId::Int(id),
 900            method: T::METHOD,
 901            params,
 902        })
 903        .unwrap();
 904
 905        let (tx, rx) = oneshot::channel();
 906        let handle_response = response_handlers
 907            .lock()
 908            .as_mut()
 909            .ok_or_else(|| anyhow!("server shut down"))
 910            .map(|handlers| {
 911                let executor = executor.clone();
 912                handlers.insert(
 913                    RequestId::Int(id),
 914                    Box::new(move |result| {
 915                        executor
 916                            .spawn(async move {
 917                                let response = match result {
 918                                    Ok(response) => match serde_json::from_str(&response) {
 919                                        Ok(deserialized) => Ok(deserialized),
 920                                        Err(error) => {
 921                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
 922                                            Err(error).context("failed to deserialize response")
 923                                        }
 924                                    }
 925                                    Err(error) => Err(anyhow!("{}", error.message)),
 926                                };
 927                                _ = tx.send(response);
 928                            })
 929                            .detach();
 930                    }),
 931                );
 932            });
 933
 934        let send = outbound_tx
 935            .try_send(message)
 936            .context("failed to write to language server's stdin");
 937
 938        let outbound_tx = outbound_tx.downgrade();
 939        let mut timeout = executor.timer(LSP_REQUEST_TIMEOUT).fuse();
 940        let started = Instant::now();
 941        async move {
 942            handle_response?;
 943            send?;
 944
 945            let cancel_on_drop = util::defer(move || {
 946                if let Some(outbound_tx) = outbound_tx.upgrade() {
 947                    Self::notify_internal::<notification::Cancel>(
 948                        &outbound_tx,
 949                        CancelParams {
 950                            id: NumberOrString::Number(id as i32),
 951                        },
 952                    )
 953                    .log_err();
 954                }
 955            });
 956
 957            let method = T::METHOD;
 958            select! {
 959                response = rx.fuse() => {
 960                    let elapsed = started.elapsed();
 961                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
 962                    cancel_on_drop.abort();
 963                    response?
 964                }
 965
 966                _ = timeout => {
 967                    log::error!("Cancelled LSP request task for {method:?} id {id} which took over {LSP_REQUEST_TIMEOUT:?}");
 968                    anyhow::bail!("LSP request timeout");
 969                }
 970            }
 971        }
 972    }
 973
 974    /// Sends a RPC notification to the language server.
 975    ///
 976    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 977    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
 978        Self::notify_internal::<T>(&self.outbound_tx, params)
 979    }
 980
 981    fn notify_internal<T: notification::Notification>(
 982        outbound_tx: &channel::Sender<String>,
 983        params: T::Params,
 984    ) -> Result<()> {
 985        let message = serde_json::to_string(&Notification {
 986            jsonrpc: JSON_RPC_VERSION,
 987            method: T::METHOD,
 988            params,
 989        })
 990        .unwrap();
 991        outbound_tx.try_send(message)?;
 992        Ok(())
 993    }
 994}
 995
 996impl Drop for LanguageServer {
 997    fn drop(&mut self) {
 998        if let Some(shutdown) = self.shutdown() {
 999            self.executor.spawn(shutdown).detach();
1000        }
1001    }
1002}
1003
1004impl Subscription {
1005    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1006    pub fn detach(&mut self) {
1007        match self {
1008            Subscription::Notification {
1009                notification_handlers,
1010                ..
1011            } => *notification_handlers = None,
1012            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1013        }
1014    }
1015}
1016
1017impl fmt::Display for LanguageServerId {
1018    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1019        self.0.fmt(f)
1020    }
1021}
1022
1023impl fmt::Debug for LanguageServer {
1024    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1025        f.debug_struct("LanguageServer")
1026            .field("id", &self.server_id.0)
1027            .field("name", &self.name)
1028            .finish_non_exhaustive()
1029    }
1030}
1031
1032impl Drop for Subscription {
1033    fn drop(&mut self) {
1034        match self {
1035            Subscription::Notification {
1036                method,
1037                notification_handlers,
1038            } => {
1039                if let Some(handlers) = notification_handlers {
1040                    handlers.lock().remove(method);
1041                }
1042            }
1043            Subscription::Io { id, io_handlers } => {
1044                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1045                    io_handlers.lock().remove(id);
1046                }
1047            }
1048        }
1049    }
1050}
1051
1052/// Mock language server for use in tests.
1053#[cfg(any(test, feature = "test-support"))]
1054#[derive(Clone)]
1055pub struct FakeLanguageServer {
1056    pub server: Arc<LanguageServer>,
1057    notifications_rx: channel::Receiver<(String, String)>,
1058}
1059
1060#[cfg(any(test, feature = "test-support"))]
1061impl FakeLanguageServer {
1062    /// Construct a fake language server.
1063    pub fn new(
1064        name: String,
1065        capabilities: ServerCapabilities,
1066        cx: AsyncAppContext,
1067    ) -> (LanguageServer, FakeLanguageServer) {
1068        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1069        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1070        let (notifications_tx, notifications_rx) = channel::unbounded();
1071
1072        let server = LanguageServer::new_internal(
1073            LanguageServerId(0),
1074            stdin_writer,
1075            stdout_reader,
1076            None::<async_pipe::PipeReader>,
1077            Arc::new(Mutex::new(None)),
1078            None,
1079            Path::new("/"),
1080            None,
1081            cx.clone(),
1082            |_| {},
1083        );
1084        let fake = FakeLanguageServer {
1085            server: Arc::new(LanguageServer::new_internal(
1086                LanguageServerId(0),
1087                stdout_writer,
1088                stdin_reader,
1089                None::<async_pipe::PipeReader>,
1090                Arc::new(Mutex::new(None)),
1091                None,
1092                Path::new("/"),
1093                None,
1094                cx,
1095                move |msg| {
1096                    notifications_tx
1097                        .try_send((
1098                            msg.method.to_string(),
1099                            msg.params
1100                                .map(|raw_value| raw_value.get())
1101                                .unwrap_or("null")
1102                                .to_string(),
1103                        ))
1104                        .ok();
1105                },
1106            )),
1107            notifications_rx,
1108        };
1109        fake.handle_request::<request::Initialize, _, _>({
1110            let capabilities = capabilities;
1111            move |_, _| {
1112                let capabilities = capabilities.clone();
1113                let name = name.clone();
1114                async move {
1115                    Ok(InitializeResult {
1116                        capabilities,
1117                        server_info: Some(ServerInfo {
1118                            name,
1119                            ..Default::default()
1120                        }),
1121                    })
1122                }
1123            }
1124        });
1125
1126        (server, fake)
1127    }
1128}
1129
1130#[cfg(any(test, feature = "test-support"))]
1131impl LanguageServer {
1132    pub fn full_capabilities() -> ServerCapabilities {
1133        ServerCapabilities {
1134            document_highlight_provider: Some(OneOf::Left(true)),
1135            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1136            document_formatting_provider: Some(OneOf::Left(true)),
1137            document_range_formatting_provider: Some(OneOf::Left(true)),
1138            definition_provider: Some(OneOf::Left(true)),
1139            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1140            ..Default::default()
1141        }
1142    }
1143}
1144
1145#[cfg(any(test, feature = "test-support"))]
1146impl FakeLanguageServer {
1147    /// See [`LanguageServer::notify`].
1148    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1149        self.server.notify::<T>(params).ok();
1150    }
1151
1152    /// See [`LanguageServer::request`].
1153    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
1154    where
1155        T: request::Request,
1156        T::Result: 'static + Send,
1157    {
1158        self.server.executor.start_waiting();
1159        self.server.request::<T>(params).await
1160    }
1161
1162    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1163    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1164        self.server.executor.start_waiting();
1165        self.try_receive_notification::<T>().await.unwrap()
1166    }
1167
1168    /// Consumes the notification channel until it finds a notification for the specified type.
1169    pub async fn try_receive_notification<T: notification::Notification>(
1170        &mut self,
1171    ) -> Option<T::Params> {
1172        use futures::StreamExt as _;
1173
1174        loop {
1175            let (method, params) = self.notifications_rx.next().await?;
1176            if method == T::METHOD {
1177                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1178            } else {
1179                log::info!("skipping message in fake language server {:?}", params);
1180            }
1181        }
1182    }
1183
1184    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1185    pub fn handle_request<T, F, Fut>(
1186        &self,
1187        mut handler: F,
1188    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1189    where
1190        T: 'static + request::Request,
1191        T::Params: 'static + Send,
1192        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
1193        Fut: 'static + Send + Future<Output = Result<T::Result>>,
1194    {
1195        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1196        self.server.remove_request_handler::<T>();
1197        self.server
1198            .on_request::<T, _, _>(move |params, cx| {
1199                let result = handler(params, cx.clone());
1200                let responded_tx = responded_tx.clone();
1201                let executor = cx.background_executor().clone();
1202                async move {
1203                    executor.simulate_random_delay().await;
1204                    let result = result.await;
1205                    responded_tx.unbounded_send(()).ok();
1206                    result
1207                }
1208            })
1209            .detach();
1210        responded_rx
1211    }
1212
1213    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1214    pub fn handle_notification<T, F>(
1215        &self,
1216        mut handler: F,
1217    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1218    where
1219        T: 'static + notification::Notification,
1220        T::Params: 'static + Send,
1221        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
1222    {
1223        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1224        self.server.remove_notification_handler::<T>();
1225        self.server
1226            .on_notification::<T, _>(move |params, cx| {
1227                handler(params, cx.clone());
1228                handled_tx.unbounded_send(()).ok();
1229            })
1230            .detach();
1231        handled_rx
1232    }
1233
1234    /// Removes any existing handler for specified notification type.
1235    pub fn remove_request_handler<T>(&mut self)
1236    where
1237        T: 'static + request::Request,
1238    {
1239        self.server.remove_request_handler::<T>();
1240    }
1241
1242    /// Simulate that the server has started work and notifies about its progress with the specified token.
1243    pub async fn start_progress(&self, token: impl Into<String>) {
1244        let token = token.into();
1245        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1246            token: NumberOrString::String(token.clone()),
1247        })
1248        .await
1249        .unwrap();
1250        self.notify::<notification::Progress>(ProgressParams {
1251            token: NumberOrString::String(token),
1252            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
1253        });
1254    }
1255
1256    /// Simulate that the server has completed work and notifies about that with the specified token.
1257    pub fn end_progress(&self, token: impl Into<String>) {
1258        self.notify::<notification::Progress>(ProgressParams {
1259            token: NumberOrString::String(token.into()),
1260            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1261        });
1262    }
1263}
1264
1265pub(self) async fn read_headers<Stdout>(
1266    reader: &mut BufReader<Stdout>,
1267    buffer: &mut Vec<u8>,
1268) -> Result<()>
1269where
1270    Stdout: AsyncRead + Unpin + Send + 'static,
1271{
1272    loop {
1273        if buffer.len() >= HEADER_DELIMITER.len()
1274            && buffer[(buffer.len() - HEADER_DELIMITER.len())..] == HEADER_DELIMITER[..]
1275        {
1276            return Ok(());
1277        }
1278
1279        if reader.read_until(b'\n', buffer).await? == 0 {
1280            return Err(anyhow!("cannot read LSP message headers"));
1281        }
1282    }
1283}
1284
1285#[cfg(test)]
1286mod tests {
1287    use super::*;
1288    use gpui::TestAppContext;
1289
1290    #[ctor::ctor]
1291    fn init_logger() {
1292        if std::env::var("RUST_LOG").is_ok() {
1293            env_logger::init();
1294        }
1295    }
1296
1297    #[gpui::test]
1298    async fn test_fake(cx: &mut TestAppContext) {
1299        cx.update(|cx| {
1300            release_channel::init("0.0.0", cx);
1301        });
1302        let (server, mut fake) =
1303            FakeLanguageServer::new("the-lsp".to_string(), Default::default(), cx.to_async());
1304
1305        let (message_tx, message_rx) = channel::unbounded();
1306        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1307        server
1308            .on_notification::<notification::ShowMessage, _>(move |params, _| {
1309                message_tx.try_send(params).unwrap()
1310            })
1311            .detach();
1312        server
1313            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1314                diagnostics_tx.try_send(params).unwrap()
1315            })
1316            .detach();
1317
1318        let server = cx.update(|cx| server.initialize(None, cx)).await.unwrap();
1319        server
1320            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1321                text_document: TextDocumentItem::new(
1322                    Url::from_str("file://a/b").unwrap(),
1323                    "rust".to_string(),
1324                    0,
1325                    "".to_string(),
1326                ),
1327            })
1328            .unwrap();
1329        assert_eq!(
1330            fake.receive_notification::<notification::DidOpenTextDocument>()
1331                .await
1332                .text_document
1333                .uri
1334                .as_str(),
1335            "file://a/b"
1336        );
1337
1338        fake.notify::<notification::ShowMessage>(ShowMessageParams {
1339            typ: MessageType::ERROR,
1340            message: "ok".to_string(),
1341        });
1342        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1343            uri: Url::from_str("file://b/c").unwrap(),
1344            version: Some(5),
1345            diagnostics: vec![],
1346        });
1347        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1348        assert_eq!(
1349            diagnostics_rx.recv().await.unwrap().uri.as_str(),
1350            "file://b/c"
1351        );
1352
1353        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1354
1355        drop(server);
1356        fake.receive_notification::<notification::Exit>().await;
1357    }
1358
1359    #[gpui::test]
1360    async fn test_read_headers() {
1361        let mut buf = Vec::new();
1362        let mut reader = smol::io::BufReader::new(b"Content-Length: 123\r\n\r\n" as &[u8]);
1363        read_headers(&mut reader, &mut buf).await.unwrap();
1364        assert_eq!(buf, b"Content-Length: 123\r\n\r\n");
1365
1366        let mut buf = Vec::new();
1367        let mut reader = smol::io::BufReader::new(b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n{\"somecontent\":123}" as &[u8]);
1368        read_headers(&mut reader, &mut buf).await.unwrap();
1369        assert_eq!(
1370            buf,
1371            b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n"
1372        );
1373
1374        let mut buf = Vec::new();
1375        let mut reader = smol::io::BufReader::new(b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n{\"somecontent\":true}" as &[u8]);
1376        read_headers(&mut reader, &mut buf).await.unwrap();
1377        assert_eq!(
1378            buf,
1379            b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n"
1380        );
1381    }
1382
1383    #[gpui::test]
1384    fn test_deserialize_string_digit_id() {
1385        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1386        let notification = serde_json::from_str::<AnyNotification>(json)
1387            .expect("message with string id should be parsed");
1388        let expected_id = RequestId::Str("2".to_string());
1389        assert_eq!(notification.id, Some(expected_id));
1390    }
1391
1392    #[gpui::test]
1393    fn test_deserialize_string_id() {
1394        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1395        let notification = serde_json::from_str::<AnyNotification>(json)
1396            .expect("message with string id should be parsed");
1397        let expected_id = RequestId::Str("anythingAtAll".to_string());
1398        assert_eq!(notification.id, Some(expected_id));
1399    }
1400
1401    #[gpui::test]
1402    fn test_deserialize_int_id() {
1403        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1404        let notification = serde_json::from_str::<AnyNotification>(json)
1405            .expect("message with string id should be parsed");
1406        let expected_id = RequestId::Int(2);
1407        assert_eq!(notification.id, Some(expected_id));
1408    }
1409}