lsp.rs

   1use log::warn;
   2pub use lsp_types::request::*;
   3pub use lsp_types::*;
   4
   5use anyhow::{anyhow, Context, Result};
   6use collections::HashMap;
   7use futures::{channel::oneshot, io::BufWriter, select, AsyncRead, AsyncWrite, Future, FutureExt};
   8use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task};
   9use parking_lot::Mutex;
  10use postage::{barrier, prelude::Stream};
  11use serde::{de::DeserializeOwned, Deserialize, Serialize};
  12use serde_json::{json, value::RawValue, Value};
  13use smol::{
  14    channel,
  15    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
  16    process::{self, Child},
  17};
  18
  19#[cfg(target_os = "windows")]
  20use smol::process::windows::CommandExt;
  21
  22use std::{
  23    ffi::OsString,
  24    fmt,
  25    io::Write,
  26    path::PathBuf,
  27    pin::Pin,
  28    str::{self, FromStr as _},
  29    sync::{
  30        atomic::{AtomicI32, Ordering::SeqCst},
  31        Arc, Weak,
  32    },
  33    task::Poll,
  34    time::{Duration, Instant},
  35};
  36use std::{path::Path, process::Stdio};
  37use util::{ResultExt, TryFutureExt};
  38
  39const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n";
  40const JSON_RPC_VERSION: &str = "2.0";
  41const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  42const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
  43const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  44
  45type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, &str, AsyncAppContext)>;
  46type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
  47type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  48
  49/// Kind of language server stdio given to an IO handler.
  50#[derive(Debug, Clone, Copy)]
  51pub enum IoKind {
  52    StdOut,
  53    StdIn,
  54    StdErr,
  55}
  56
  57/// Represents a launchable language server. This can either be a standalone binary or the path
  58/// to a runtime with arguments to instruct it to launch the actual language server file.
  59#[derive(Debug, Clone, Deserialize)]
  60pub struct LanguageServerBinary {
  61    pub path: PathBuf,
  62    pub arguments: Vec<OsString>,
  63    pub env: Option<HashMap<String, String>>,
  64}
  65
  66/// A running language server process.
  67pub struct LanguageServer {
  68    server_id: LanguageServerId,
  69    next_id: AtomicI32,
  70    outbound_tx: channel::Sender<String>,
  71    name: Arc<str>,
  72    capabilities: ServerCapabilities,
  73    code_action_kinds: Option<Vec<CodeActionKind>>,
  74    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
  75    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
  76    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
  77    executor: BackgroundExecutor,
  78    #[allow(clippy::type_complexity)]
  79    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
  80    output_done_rx: Mutex<Option<barrier::Receiver>>,
  81    root_path: PathBuf,
  82    server: Arc<Mutex<Option<Child>>>,
  83}
  84
  85/// Identifies a running language server.
  86#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
  87#[repr(transparent)]
  88pub struct LanguageServerId(pub usize);
  89
  90/// Handle to a language server RPC activity subscription.
  91pub enum Subscription {
  92    Notification {
  93        method: &'static str,
  94        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
  95    },
  96    Io {
  97        id: i32,
  98        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
  99    },
 100}
 101
 102/// Language server protocol RPC request message ID.
 103///
 104/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 105#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 106#[serde(untagged)]
 107pub enum RequestId {
 108    Int(i32),
 109    Str(String),
 110}
 111
 112/// Language server protocol RPC request message.
 113///
 114/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 115#[derive(Serialize, Deserialize)]
 116pub struct Request<'a, T> {
 117    jsonrpc: &'static str,
 118    id: RequestId,
 119    method: &'a str,
 120    params: T,
 121}
 122
 123/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 124#[derive(Serialize, Deserialize)]
 125struct AnyResponse<'a> {
 126    jsonrpc: &'a str,
 127    id: RequestId,
 128    #[serde(default)]
 129    error: Option<Error>,
 130    #[serde(borrow)]
 131    result: Option<&'a RawValue>,
 132}
 133
 134/// Language server protocol RPC request response message.
 135///
 136/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 137#[derive(Serialize)]
 138struct Response<T> {
 139    jsonrpc: &'static str,
 140    id: RequestId,
 141    result: Option<T>,
 142    error: Option<Error>,
 143}
 144
 145/// Language server protocol RPC notification message.
 146///
 147/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 148#[derive(Serialize, Deserialize)]
 149struct Notification<'a, T> {
 150    jsonrpc: &'static str,
 151    #[serde(borrow)]
 152    method: &'a str,
 153    params: T,
 154}
 155
 156/// Language server RPC notification message before it is deserialized into a concrete type.
 157#[derive(Debug, Clone, Deserialize)]
 158struct AnyNotification<'a> {
 159    #[serde(default)]
 160    id: Option<RequestId>,
 161    #[serde(borrow)]
 162    method: &'a str,
 163    #[serde(borrow, default)]
 164    params: Option<&'a RawValue>,
 165}
 166
 167#[derive(Debug, Serialize, Deserialize)]
 168struct Error {
 169    message: String,
 170}
 171
 172pub trait LspRequestFuture<O>: Future<Output = O> {
 173    fn id(&self) -> i32;
 174}
 175
 176struct LspRequest<F> {
 177    id: i32,
 178    request: F,
 179}
 180
 181impl<F> LspRequest<F> {
 182    pub fn new(id: i32, request: F) -> Self {
 183        Self { id, request }
 184    }
 185}
 186
 187impl<F: Future> Future for LspRequest<F> {
 188    type Output = F::Output;
 189
 190    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
 191        // SAFETY: This is standard pin projection, we're pinned so our fields must be pinned.
 192        let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().request) };
 193        inner.poll(cx)
 194    }
 195}
 196
 197impl<F: Future> LspRequestFuture<F::Output> for LspRequest<F> {
 198    fn id(&self) -> i32 {
 199        self.id
 200    }
 201}
 202
 203/// Experimental: Informs the end user about the state of the server
 204///
 205/// [Rust Analyzer Specification](https://github.com/rust-lang/rust-analyzer/blob/master/docs/dev/lsp-extensions.md#server-status)
 206#[derive(Debug)]
 207pub enum ServerStatus {}
 208
 209/// Other(String) variant to handle unknown values due to this still being experimental
 210#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
 211#[serde(rename_all = "camelCase")]
 212pub enum ServerHealthStatus {
 213    Ok,
 214    Warning,
 215    Error,
 216    Other(String),
 217}
 218
 219#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
 220#[serde(rename_all = "camelCase")]
 221pub struct ServerStatusParams {
 222    pub health: ServerHealthStatus,
 223    pub message: Option<String>,
 224}
 225
 226impl lsp_types::notification::Notification for ServerStatus {
 227    type Params = ServerStatusParams;
 228    const METHOD: &'static str = "experimental/serverStatus";
 229}
 230
 231impl LanguageServer {
 232    /// Starts a language server process.
 233    pub fn new(
 234        stderr_capture: Arc<Mutex<Option<String>>>,
 235        server_id: LanguageServerId,
 236        binary: LanguageServerBinary,
 237        root_path: &Path,
 238        code_action_kinds: Option<Vec<CodeActionKind>>,
 239        cx: AsyncAppContext,
 240    ) -> Result<Self> {
 241        let working_dir = if root_path.is_dir() {
 242            root_path
 243        } else {
 244            root_path.parent().unwrap_or_else(|| Path::new("/"))
 245        };
 246
 247        log::info!(
 248            "starting language server. binary path: {:?}, working directory: {:?}, args: {:?}",
 249            binary.path,
 250            working_dir,
 251            &binary.arguments
 252        );
 253
 254        let mut command = process::Command::new(&binary.path);
 255        command
 256            .current_dir(working_dir)
 257            .args(binary.arguments)
 258            .envs(binary.env.unwrap_or_default())
 259            .stdin(Stdio::piped())
 260            .stdout(Stdio::piped())
 261            .stderr(Stdio::piped())
 262            .kill_on_drop(true);
 263        #[cfg(windows)]
 264        command.creation_flags(windows::Win32::System::Threading::CREATE_NO_WINDOW.0);
 265        let mut server = command.spawn()?;
 266
 267        let stdin = server.stdin.take().unwrap();
 268        let stdout = server.stdout.take().unwrap();
 269        let stderr = server.stderr.take().unwrap();
 270        let mut server = Self::new_internal(
 271            server_id,
 272            stdin,
 273            stdout,
 274            Some(stderr),
 275            stderr_capture,
 276            Some(server),
 277            root_path,
 278            code_action_kinds,
 279            cx,
 280            move |notification| {
 281                log::info!(
 282                    "Language server with id {} sent unhandled notification {}:\n{}",
 283                    server_id,
 284                    notification.method,
 285                    serde_json::to_string_pretty(
 286                        &notification
 287                            .params
 288                            .and_then(|params| Value::from_str(params.get()).ok())
 289                            .unwrap_or(Value::Null)
 290                    )
 291                    .unwrap(),
 292                );
 293            },
 294        );
 295
 296        if let Some(name) = binary.path.file_name() {
 297            server.name = name.to_string_lossy().into();
 298        }
 299
 300        Ok(server)
 301    }
 302
 303    #[allow(clippy::too_many_arguments)]
 304    fn new_internal<Stdin, Stdout, Stderr, F>(
 305        server_id: LanguageServerId,
 306        stdin: Stdin,
 307        stdout: Stdout,
 308        stderr: Option<Stderr>,
 309        stderr_capture: Arc<Mutex<Option<String>>>,
 310        server: Option<Child>,
 311        root_path: &Path,
 312        code_action_kinds: Option<Vec<CodeActionKind>>,
 313        cx: AsyncAppContext,
 314        on_unhandled_notification: F,
 315    ) -> Self
 316    where
 317        Stdin: AsyncWrite + Unpin + Send + 'static,
 318        Stdout: AsyncRead + Unpin + Send + 'static,
 319        Stderr: AsyncRead + Unpin + Send + 'static,
 320        F: FnMut(AnyNotification) + 'static + Send + Sync + Clone,
 321    {
 322        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 323        let (output_done_tx, output_done_rx) = barrier::channel();
 324        let notification_handlers =
 325            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 326        let response_handlers =
 327            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 328        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 329
 330        let stdout_input_task = cx.spawn({
 331            let on_unhandled_notification = on_unhandled_notification.clone();
 332            let notification_handlers = notification_handlers.clone();
 333            let response_handlers = response_handlers.clone();
 334            let io_handlers = io_handlers.clone();
 335            move |cx| {
 336                Self::handle_input(
 337                    stdout,
 338                    on_unhandled_notification,
 339                    notification_handlers,
 340                    response_handlers,
 341                    io_handlers,
 342                    cx,
 343                )
 344                .log_err()
 345            }
 346        });
 347        let stderr_input_task = stderr
 348            .map(|stderr| {
 349                let io_handlers = io_handlers.clone();
 350                let stderr_captures = stderr_capture.clone();
 351                cx.spawn(|_| Self::handle_stderr(stderr, io_handlers, stderr_captures).log_err())
 352            })
 353            .unwrap_or_else(|| Task::Ready(Some(None)));
 354        let input_task = cx.spawn(|_| async move {
 355            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 356            stdout.or(stderr)
 357        });
 358        let output_task = cx.background_executor().spawn({
 359            Self::handle_output(
 360                stdin,
 361                outbound_rx,
 362                output_done_tx,
 363                response_handlers.clone(),
 364                io_handlers.clone(),
 365            )
 366            .log_err()
 367        });
 368
 369        Self {
 370            server_id,
 371            notification_handlers,
 372            response_handlers,
 373            io_handlers,
 374            name: "".into(),
 375            capabilities: Default::default(),
 376            code_action_kinds,
 377            next_id: Default::default(),
 378            outbound_tx,
 379            executor: cx.background_executor().clone(),
 380            io_tasks: Mutex::new(Some((input_task, output_task))),
 381            output_done_rx: Mutex::new(Some(output_done_rx)),
 382            root_path: root_path.to_path_buf(),
 383            server: Arc::new(Mutex::new(server)),
 384        }
 385    }
 386
 387    /// List of code action kinds this language server reports being able to emit.
 388    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 389        self.code_action_kinds.clone()
 390    }
 391
 392    async fn handle_input<Stdout, F>(
 393        stdout: Stdout,
 394        mut on_unhandled_notification: F,
 395        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 396        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 397        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 398        cx: AsyncAppContext,
 399    ) -> anyhow::Result<()>
 400    where
 401        Stdout: AsyncRead + Unpin + Send + 'static,
 402        F: FnMut(AnyNotification) + 'static + Send,
 403    {
 404        let mut stdout = BufReader::new(stdout);
 405        let _clear_response_handlers = util::defer({
 406            let response_handlers = response_handlers.clone();
 407            move || {
 408                response_handlers.lock().take();
 409            }
 410        });
 411        let mut buffer = Vec::new();
 412        loop {
 413            buffer.clear();
 414
 415            read_headers(&mut stdout, &mut buffer).await?;
 416
 417            let headers = std::str::from_utf8(&buffer)?;
 418
 419            let message_len = headers
 420                .split('\n')
 421                .find(|line| line.starts_with(CONTENT_LEN_HEADER))
 422                .and_then(|line| line.strip_prefix(CONTENT_LEN_HEADER))
 423                .ok_or_else(|| anyhow!("invalid LSP message header {headers:?}"))?
 424                .trim_end()
 425                .parse()?;
 426
 427            buffer.resize(message_len, 0);
 428            stdout.read_exact(&mut buffer).await?;
 429
 430            if let Ok(message) = str::from_utf8(&buffer) {
 431                log::trace!("incoming message: {message}");
 432                for handler in io_handlers.lock().values_mut() {
 433                    handler(IoKind::StdOut, message);
 434                }
 435            }
 436
 437            if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
 438                if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
 439                    handler(
 440                        msg.id,
 441                        msg.params.map(|params| params.get()).unwrap_or("null"),
 442                        cx.clone(),
 443                    );
 444                } else {
 445                    on_unhandled_notification(msg);
 446                }
 447            } else if let Ok(AnyResponse {
 448                id, error, result, ..
 449            }) = serde_json::from_slice(&buffer)
 450            {
 451                if let Some(handler) = response_handlers
 452                    .lock()
 453                    .as_mut()
 454                    .and_then(|handlers| handlers.remove(&id))
 455                {
 456                    if let Some(error) = error {
 457                        handler(Err(error));
 458                    } else if let Some(result) = result {
 459                        handler(Ok(result.get().into()));
 460                    } else {
 461                        handler(Ok("null".into()));
 462                    }
 463                }
 464            } else {
 465                warn!(
 466                    "failed to deserialize LSP message:\n{}",
 467                    std::str::from_utf8(&buffer)?
 468                );
 469            }
 470
 471            // Don't starve the main thread when receiving lots of messages at once.
 472            smol::future::yield_now().await;
 473        }
 474    }
 475
 476    async fn handle_stderr<Stderr>(
 477        stderr: Stderr,
 478        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 479        stderr_capture: Arc<Mutex<Option<String>>>,
 480    ) -> anyhow::Result<()>
 481    where
 482        Stderr: AsyncRead + Unpin + Send + 'static,
 483    {
 484        let mut stderr = BufReader::new(stderr);
 485        let mut buffer = Vec::new();
 486
 487        loop {
 488            buffer.clear();
 489
 490            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 491            if bytes_read == 0 {
 492                return Ok(());
 493            }
 494
 495            if let Ok(message) = str::from_utf8(&buffer) {
 496                log::trace!("incoming stderr message:{message}");
 497                for handler in io_handlers.lock().values_mut() {
 498                    handler(IoKind::StdErr, message);
 499                }
 500
 501                if let Some(stderr) = stderr_capture.lock().as_mut() {
 502                    stderr.push_str(message);
 503                }
 504            }
 505
 506            // Don't starve the main thread when receiving lots of messages at once.
 507            smol::future::yield_now().await;
 508        }
 509    }
 510
 511    async fn handle_output<Stdin>(
 512        stdin: Stdin,
 513        outbound_rx: channel::Receiver<String>,
 514        output_done_tx: barrier::Sender,
 515        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 516        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 517    ) -> anyhow::Result<()>
 518    where
 519        Stdin: AsyncWrite + Unpin + Send + 'static,
 520    {
 521        let mut stdin = BufWriter::new(stdin);
 522        let _clear_response_handlers = util::defer({
 523            let response_handlers = response_handlers.clone();
 524            move || {
 525                response_handlers.lock().take();
 526            }
 527        });
 528        let mut content_len_buffer = Vec::new();
 529        while let Ok(message) = outbound_rx.recv().await {
 530            log::trace!("outgoing message:{}", message);
 531            for handler in io_handlers.lock().values_mut() {
 532                handler(IoKind::StdIn, &message);
 533            }
 534
 535            content_len_buffer.clear();
 536            write!(content_len_buffer, "{}", message.len()).unwrap();
 537            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 538            stdin.write_all(&content_len_buffer).await?;
 539            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 540            stdin.write_all(message.as_bytes()).await?;
 541            stdin.flush().await?;
 542        }
 543        drop(output_done_tx);
 544        Ok(())
 545    }
 546
 547    /// Initializes a language server by sending the `Initialize` request.
 548    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
 549    ///
 550    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
 551    pub fn initialize(
 552        mut self,
 553        options: Option<Value>,
 554        cx: &AppContext,
 555    ) -> Task<Result<Arc<Self>>> {
 556        let root_uri = Url::from_file_path(&self.root_path).unwrap();
 557        #[allow(deprecated)]
 558        let params = InitializeParams {
 559            process_id: None,
 560            root_path: None,
 561            root_uri: Some(root_uri.clone()),
 562            initialization_options: options,
 563            capabilities: ClientCapabilities {
 564                workspace: Some(WorkspaceClientCapabilities {
 565                    configuration: Some(true),
 566                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 567                        dynamic_registration: Some(true),
 568                        relative_pattern_support: Some(true),
 569                    }),
 570                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 571                        dynamic_registration: Some(true),
 572                    }),
 573                    workspace_folders: Some(true),
 574                    symbol: Some(WorkspaceSymbolClientCapabilities {
 575                        resolve_support: None,
 576                        ..WorkspaceSymbolClientCapabilities::default()
 577                    }),
 578                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 579                        refresh_support: Some(true),
 580                    }),
 581                    diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
 582                        refresh_support: None,
 583                    }),
 584                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 585                        resource_operations: Some(vec![
 586                            ResourceOperationKind::Create,
 587                            ResourceOperationKind::Rename,
 588                            ResourceOperationKind::Delete,
 589                        ]),
 590                        document_changes: Some(true),
 591                        ..WorkspaceEditClientCapabilities::default()
 592                    }),
 593                    ..Default::default()
 594                }),
 595                text_document: Some(TextDocumentClientCapabilities {
 596                    definition: Some(GotoCapability {
 597                        link_support: Some(true),
 598                        dynamic_registration: None,
 599                    }),
 600                    code_action: Some(CodeActionClientCapabilities {
 601                        code_action_literal_support: Some(CodeActionLiteralSupport {
 602                            code_action_kind: CodeActionKindLiteralSupport {
 603                                value_set: vec![
 604                                    CodeActionKind::REFACTOR.as_str().into(),
 605                                    CodeActionKind::QUICKFIX.as_str().into(),
 606                                    CodeActionKind::SOURCE.as_str().into(),
 607                                ],
 608                            },
 609                        }),
 610                        data_support: Some(true),
 611                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 612                            properties: vec![
 613                                "kind".to_string(),
 614                                "diagnostics".to_string(),
 615                                "isPreferred".to_string(),
 616                                "disabled".to_string(),
 617                                "edit".to_string(),
 618                                "command".to_string(),
 619                            ],
 620                        }),
 621                        ..Default::default()
 622                    }),
 623                    completion: Some(CompletionClientCapabilities {
 624                        completion_item: Some(CompletionItemCapability {
 625                            snippet_support: Some(true),
 626                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 627                                properties: vec![
 628                                    "documentation".to_string(),
 629                                    "additionalTextEdits".to_string(),
 630                                ],
 631                            }),
 632                            insert_replace_support: Some(true),
 633                            ..Default::default()
 634                        }),
 635                        completion_list: Some(CompletionListCapability {
 636                            item_defaults: Some(vec![
 637                                "commitCharacters".to_owned(),
 638                                "editRange".to_owned(),
 639                                "insertTextMode".to_owned(),
 640                                "data".to_owned(),
 641                            ]),
 642                        }),
 643                        ..Default::default()
 644                    }),
 645                    rename: Some(RenameClientCapabilities {
 646                        prepare_support: Some(true),
 647                        ..Default::default()
 648                    }),
 649                    hover: Some(HoverClientCapabilities {
 650                        content_format: Some(vec![MarkupKind::Markdown]),
 651                        dynamic_registration: None,
 652                    }),
 653                    inlay_hint: Some(InlayHintClientCapabilities {
 654                        resolve_support: Some(InlayHintResolveClientCapabilities {
 655                            properties: vec![
 656                                "textEdits".to_string(),
 657                                "tooltip".to_string(),
 658                                "label.tooltip".to_string(),
 659                                "label.location".to_string(),
 660                                "label.command".to_string(),
 661                            ],
 662                        }),
 663                        dynamic_registration: Some(false),
 664                    }),
 665                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 666                        related_information: Some(true),
 667                        ..Default::default()
 668                    }),
 669                    formatting: Some(DynamicRegistrationClientCapabilities {
 670                        dynamic_registration: None,
 671                    }),
 672                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 673                        dynamic_registration: None,
 674                    }),
 675                    diagnostic: Some(DiagnosticClientCapabilities {
 676                        related_document_support: Some(true),
 677                        dynamic_registration: None,
 678                    }),
 679                    ..Default::default()
 680                }),
 681                experimental: Some(json!({
 682                    "serverStatusNotification": true,
 683                })),
 684                window: Some(WindowClientCapabilities {
 685                    work_done_progress: Some(true),
 686                    ..Default::default()
 687                }),
 688                general: None,
 689            },
 690            trace: None,
 691            workspace_folders: Some(vec![WorkspaceFolder {
 692                uri: root_uri,
 693                name: Default::default(),
 694            }]),
 695            client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
 696                ClientInfo {
 697                    name: release_channel.display_name().to_string(),
 698                    version: Some(release_channel::AppVersion::global(cx).to_string()),
 699                }
 700            }),
 701            locale: None,
 702        };
 703
 704        cx.spawn(|_| async move {
 705            let response = self.request::<request::Initialize>(params).await?;
 706            if let Some(info) = response.server_info {
 707                self.name = info.name.into();
 708            }
 709            self.capabilities = response.capabilities;
 710
 711            self.notify::<notification::Initialized>(InitializedParams {})?;
 712            Ok(Arc::new(self))
 713        })
 714    }
 715
 716    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
 717    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
 718        if let Some(tasks) = self.io_tasks.lock().take() {
 719            let response_handlers = self.response_handlers.clone();
 720            let next_id = AtomicI32::new(self.next_id.load(SeqCst));
 721            let outbound_tx = self.outbound_tx.clone();
 722            let executor = self.executor.clone();
 723            let mut output_done = self.output_done_rx.lock().take().unwrap();
 724            let shutdown_request = Self::request_internal::<request::Shutdown>(
 725                &next_id,
 726                &response_handlers,
 727                &outbound_tx,
 728                &executor,
 729                (),
 730            );
 731            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
 732            outbound_tx.close();
 733
 734            let server = self.server.clone();
 735            let name = self.name.clone();
 736            let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
 737            Some(
 738                async move {
 739                    log::debug!("language server shutdown started");
 740
 741                    select! {
 742                        request_result = shutdown_request.fuse() => {
 743                            request_result?;
 744                        }
 745
 746                        _ = timer => {
 747                            log::info!("timeout waiting for language server {name} to shutdown");
 748                        },
 749                    }
 750
 751                    response_handlers.lock().take();
 752                    exit?;
 753                    output_done.recv().await;
 754                    server.lock().take().map(|mut child| child.kill());
 755                    log::debug!("language server shutdown finished");
 756
 757                    drop(tasks);
 758                    anyhow::Ok(())
 759                }
 760                .log_err(),
 761            )
 762        } else {
 763            None
 764        }
 765    }
 766
 767    /// Register a handler to handle incoming LSP notifications.
 768    ///
 769    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 770    #[must_use]
 771    pub fn on_notification<T, F>(&self, f: F) -> Subscription
 772    where
 773        T: notification::Notification,
 774        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
 775    {
 776        self.on_custom_notification(T::METHOD, f)
 777    }
 778
 779    /// Register a handler to handle incoming LSP requests.
 780    ///
 781    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 782    #[must_use]
 783    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
 784    where
 785        T: request::Request,
 786        T::Params: 'static + Send,
 787        F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
 788        Fut: 'static + Future<Output = Result<T::Result>>,
 789    {
 790        self.on_custom_request(T::METHOD, f)
 791    }
 792
 793    /// Registers a handler to inspect all language server process stdio.
 794    #[must_use]
 795    pub fn on_io<F>(&self, f: F) -> Subscription
 796    where
 797        F: 'static + Send + FnMut(IoKind, &str),
 798    {
 799        let id = self.next_id.fetch_add(1, SeqCst);
 800        self.io_handlers.lock().insert(id, Box::new(f));
 801        Subscription::Io {
 802            id,
 803            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
 804        }
 805    }
 806
 807    /// Removes a request handler registers via [`Self::on_request`].
 808    pub fn remove_request_handler<T: request::Request>(&self) {
 809        self.notification_handlers.lock().remove(T::METHOD);
 810    }
 811
 812    /// Removes a notification handler registers via [`Self::on_notification`].
 813    pub fn remove_notification_handler<T: notification::Notification>(&self) {
 814        self.notification_handlers.lock().remove(T::METHOD);
 815    }
 816
 817    /// Checks if a notification handler has been registered via [`Self::on_notification`].
 818    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
 819        self.notification_handlers.lock().contains_key(T::METHOD)
 820    }
 821
 822    #[must_use]
 823    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
 824    where
 825        F: 'static + FnMut(Params, AsyncAppContext) + Send,
 826        Params: DeserializeOwned,
 827    {
 828        let prev_handler = self.notification_handlers.lock().insert(
 829            method,
 830            Box::new(move |_, params, cx| {
 831                if let Some(params) = serde_json::from_str(params).log_err() {
 832                    f(params, cx);
 833                }
 834            }),
 835        );
 836        assert!(
 837            prev_handler.is_none(),
 838            "registered multiple handlers for the same LSP method"
 839        );
 840        Subscription::Notification {
 841            method,
 842            notification_handlers: Some(self.notification_handlers.clone()),
 843        }
 844    }
 845
 846    #[must_use]
 847    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
 848    where
 849        F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
 850        Fut: 'static + Future<Output = Result<Res>>,
 851        Params: DeserializeOwned + Send + 'static,
 852        Res: Serialize,
 853    {
 854        let outbound_tx = self.outbound_tx.clone();
 855        let prev_handler = self.notification_handlers.lock().insert(
 856            method,
 857            Box::new(move |id, params, cx| {
 858                if let Some(id) = id {
 859                    match serde_json::from_str(params) {
 860                        Ok(params) => {
 861                            let response = f(params, cx.clone());
 862                            cx.foreground_executor()
 863                                .spawn({
 864                                    let outbound_tx = outbound_tx.clone();
 865                                    async move {
 866                                        let response = match response.await {
 867                                            Ok(result) => Response {
 868                                                jsonrpc: JSON_RPC_VERSION,
 869                                                id,
 870                                                result: Some(result),
 871                                                error: None,
 872                                            },
 873                                            Err(error) => Response {
 874                                                jsonrpc: JSON_RPC_VERSION,
 875                                                id,
 876                                                result: None,
 877                                                error: Some(Error {
 878                                                    message: error.to_string(),
 879                                                }),
 880                                            },
 881                                        };
 882                                        if let Some(response) =
 883                                            serde_json::to_string(&response).log_err()
 884                                        {
 885                                            outbound_tx.try_send(response).ok();
 886                                        }
 887                                    }
 888                                })
 889                                .detach();
 890                        }
 891
 892                        Err(error) => {
 893                            log::error!(
 894                                "error deserializing {} request: {:?}, message: {:?}",
 895                                method,
 896                                error,
 897                                params
 898                            );
 899                            let response = AnyResponse {
 900                                jsonrpc: JSON_RPC_VERSION,
 901                                id,
 902                                result: None,
 903                                error: Some(Error {
 904                                    message: error.to_string(),
 905                                }),
 906                            };
 907                            if let Some(response) = serde_json::to_string(&response).log_err() {
 908                                outbound_tx.try_send(response).ok();
 909                            }
 910                        }
 911                    }
 912                }
 913            }),
 914        );
 915        assert!(
 916            prev_handler.is_none(),
 917            "registered multiple handlers for the same LSP method"
 918        );
 919        Subscription::Notification {
 920            method,
 921            notification_handlers: Some(self.notification_handlers.clone()),
 922        }
 923    }
 924
 925    /// Get the name of the running language server.
 926    pub fn name(&self) -> &str {
 927        &self.name
 928    }
 929
 930    /// Get the reported capabilities of the running language server.
 931    pub fn capabilities(&self) -> &ServerCapabilities {
 932        &self.capabilities
 933    }
 934
 935    /// Get the id of the running language server.
 936    pub fn server_id(&self) -> LanguageServerId {
 937        self.server_id
 938    }
 939
 940    /// Get the root path of the project the language server is running against.
 941    pub fn root_path(&self) -> &PathBuf {
 942        &self.root_path
 943    }
 944
 945    /// Sends a RPC request to the language server.
 946    ///
 947    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 948    pub fn request<T: request::Request>(
 949        &self,
 950        params: T::Params,
 951    ) -> impl LspRequestFuture<Result<T::Result>>
 952    where
 953        T::Result: 'static + Send,
 954    {
 955        Self::request_internal::<T>(
 956            &self.next_id,
 957            &self.response_handlers,
 958            &self.outbound_tx,
 959            &self.executor,
 960            params,
 961        )
 962    }
 963
 964    fn request_internal<T: request::Request>(
 965        next_id: &AtomicI32,
 966        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
 967        outbound_tx: &channel::Sender<String>,
 968        executor: &BackgroundExecutor,
 969        params: T::Params,
 970    ) -> impl LspRequestFuture<Result<T::Result>>
 971    where
 972        T::Result: 'static + Send,
 973    {
 974        let id = next_id.fetch_add(1, SeqCst);
 975        let message = serde_json::to_string(&Request {
 976            jsonrpc: JSON_RPC_VERSION,
 977            id: RequestId::Int(id),
 978            method: T::METHOD,
 979            params,
 980        })
 981        .unwrap();
 982
 983        let (tx, rx) = oneshot::channel();
 984        let handle_response = response_handlers
 985            .lock()
 986            .as_mut()
 987            .ok_or_else(|| anyhow!("server shut down"))
 988            .map(|handlers| {
 989                let executor = executor.clone();
 990                handlers.insert(
 991                    RequestId::Int(id),
 992                    Box::new(move |result| {
 993                        executor
 994                            .spawn(async move {
 995                                let response = match result {
 996                                    Ok(response) => match serde_json::from_str(&response) {
 997                                        Ok(deserialized) => Ok(deserialized),
 998                                        Err(error) => {
 999                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
1000                                            Err(error).context("failed to deserialize response")
1001                                        }
1002                                    }
1003                                    Err(error) => Err(anyhow!("{}", error.message)),
1004                                };
1005                                _ = tx.send(response);
1006                            })
1007                            .detach();
1008                    }),
1009                );
1010            });
1011
1012        let send = outbound_tx
1013            .try_send(message)
1014            .context("failed to write to language server's stdin");
1015
1016        let outbound_tx = outbound_tx.downgrade();
1017        let mut timeout = executor.timer(LSP_REQUEST_TIMEOUT).fuse();
1018        let started = Instant::now();
1019        LspRequest::new(id, async move {
1020            handle_response?;
1021            send?;
1022
1023            let cancel_on_drop = util::defer(move || {
1024                if let Some(outbound_tx) = outbound_tx.upgrade() {
1025                    Self::notify_internal::<notification::Cancel>(
1026                        &outbound_tx,
1027                        CancelParams {
1028                            id: NumberOrString::Number(id),
1029                        },
1030                    )
1031                    .log_err();
1032                }
1033            });
1034
1035            let method = T::METHOD;
1036            select! {
1037                response = rx.fuse() => {
1038                    let elapsed = started.elapsed();
1039                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1040                    cancel_on_drop.abort();
1041                    response?
1042                }
1043
1044                _ = timeout => {
1045                    log::error!("Cancelled LSP request task for {method:?} id {id} which took over {LSP_REQUEST_TIMEOUT:?}");
1046                    anyhow::bail!("LSP request timeout");
1047                }
1048            }
1049        })
1050    }
1051
1052    /// Sends a RPC notification to the language server.
1053    ///
1054    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1055    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1056        Self::notify_internal::<T>(&self.outbound_tx, params)
1057    }
1058
1059    fn notify_internal<T: notification::Notification>(
1060        outbound_tx: &channel::Sender<String>,
1061        params: T::Params,
1062    ) -> Result<()> {
1063        let message = serde_json::to_string(&Notification {
1064            jsonrpc: JSON_RPC_VERSION,
1065            method: T::METHOD,
1066            params,
1067        })
1068        .unwrap();
1069        outbound_tx.try_send(message)?;
1070        Ok(())
1071    }
1072}
1073
1074impl Drop for LanguageServer {
1075    fn drop(&mut self) {
1076        if let Some(shutdown) = self.shutdown() {
1077            self.executor.spawn(shutdown).detach();
1078        }
1079    }
1080}
1081
1082impl Subscription {
1083    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1084    pub fn detach(&mut self) {
1085        match self {
1086            Subscription::Notification {
1087                notification_handlers,
1088                ..
1089            } => *notification_handlers = None,
1090            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1091        }
1092    }
1093}
1094
1095impl fmt::Display for LanguageServerId {
1096    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1097        self.0.fmt(f)
1098    }
1099}
1100
1101impl fmt::Debug for LanguageServer {
1102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1103        f.debug_struct("LanguageServer")
1104            .field("id", &self.server_id.0)
1105            .field("name", &self.name)
1106            .finish_non_exhaustive()
1107    }
1108}
1109
1110impl Drop for Subscription {
1111    fn drop(&mut self) {
1112        match self {
1113            Subscription::Notification {
1114                method,
1115                notification_handlers,
1116            } => {
1117                if let Some(handlers) = notification_handlers {
1118                    handlers.lock().remove(method);
1119                }
1120            }
1121            Subscription::Io { id, io_handlers } => {
1122                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1123                    io_handlers.lock().remove(id);
1124                }
1125            }
1126        }
1127    }
1128}
1129
1130/// Mock language server for use in tests.
1131#[cfg(any(test, feature = "test-support"))]
1132#[derive(Clone)]
1133pub struct FakeLanguageServer {
1134    pub binary: LanguageServerBinary,
1135    pub server: Arc<LanguageServer>,
1136    notifications_rx: channel::Receiver<(String, String)>,
1137}
1138
1139#[cfg(any(test, feature = "test-support"))]
1140impl FakeLanguageServer {
1141    /// Construct a fake language server.
1142    pub fn new(
1143        server_id: LanguageServerId,
1144        binary: LanguageServerBinary,
1145        name: String,
1146        capabilities: ServerCapabilities,
1147        cx: AsyncAppContext,
1148    ) -> (LanguageServer, FakeLanguageServer) {
1149        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1150        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1151        let (notifications_tx, notifications_rx) = channel::unbounded();
1152
1153        let mut server = LanguageServer::new_internal(
1154            server_id,
1155            stdin_writer,
1156            stdout_reader,
1157            None::<async_pipe::PipeReader>,
1158            Arc::new(Mutex::new(None)),
1159            None,
1160            Path::new("/"),
1161            None,
1162            cx.clone(),
1163            |_| {},
1164        );
1165        server.name = name.as_str().into();
1166        let fake = FakeLanguageServer {
1167            binary,
1168            server: Arc::new({
1169                let mut server = LanguageServer::new_internal(
1170                    server_id,
1171                    stdout_writer,
1172                    stdin_reader,
1173                    None::<async_pipe::PipeReader>,
1174                    Arc::new(Mutex::new(None)),
1175                    None,
1176                    Path::new("/"),
1177                    None,
1178                    cx,
1179                    move |msg| {
1180                        notifications_tx
1181                            .try_send((
1182                                msg.method.to_string(),
1183                                msg.params
1184                                    .map(|raw_value| raw_value.get())
1185                                    .unwrap_or("null")
1186                                    .to_string(),
1187                            ))
1188                            .ok();
1189                    },
1190                );
1191                server.name = name.as_str().into();
1192                server
1193            }),
1194            notifications_rx,
1195        };
1196        fake.handle_request::<request::Initialize, _, _>({
1197            let capabilities = capabilities;
1198            move |_, _| {
1199                let capabilities = capabilities.clone();
1200                let name = name.clone();
1201                async move {
1202                    Ok(InitializeResult {
1203                        capabilities,
1204                        server_info: Some(ServerInfo {
1205                            name,
1206                            ..Default::default()
1207                        }),
1208                    })
1209                }
1210            }
1211        });
1212
1213        (server, fake)
1214    }
1215}
1216
1217#[cfg(any(test, feature = "test-support"))]
1218impl LanguageServer {
1219    pub fn full_capabilities() -> ServerCapabilities {
1220        ServerCapabilities {
1221            document_highlight_provider: Some(OneOf::Left(true)),
1222            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1223            document_formatting_provider: Some(OneOf::Left(true)),
1224            document_range_formatting_provider: Some(OneOf::Left(true)),
1225            definition_provider: Some(OneOf::Left(true)),
1226            implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1227            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1228            ..Default::default()
1229        }
1230    }
1231}
1232
1233#[cfg(any(test, feature = "test-support"))]
1234impl FakeLanguageServer {
1235    /// See [`LanguageServer::notify`].
1236    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1237        self.server.notify::<T>(params).ok();
1238    }
1239
1240    /// See [`LanguageServer::request`].
1241    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
1242    where
1243        T: request::Request,
1244        T::Result: 'static + Send,
1245    {
1246        self.server.executor.start_waiting();
1247        self.server.request::<T>(params).await
1248    }
1249
1250    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1251    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1252        self.server.executor.start_waiting();
1253        self.try_receive_notification::<T>().await.unwrap()
1254    }
1255
1256    /// Consumes the notification channel until it finds a notification for the specified type.
1257    pub async fn try_receive_notification<T: notification::Notification>(
1258        &mut self,
1259    ) -> Option<T::Params> {
1260        use futures::StreamExt as _;
1261
1262        loop {
1263            let (method, params) = self.notifications_rx.next().await?;
1264            if method == T::METHOD {
1265                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1266            } else {
1267                log::info!("skipping message in fake language server {:?}", params);
1268            }
1269        }
1270    }
1271
1272    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1273    pub fn handle_request<T, F, Fut>(
1274        &self,
1275        mut handler: F,
1276    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1277    where
1278        T: 'static + request::Request,
1279        T::Params: 'static + Send,
1280        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
1281        Fut: 'static + Send + Future<Output = Result<T::Result>>,
1282    {
1283        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1284        self.server.remove_request_handler::<T>();
1285        self.server
1286            .on_request::<T, _, _>(move |params, cx| {
1287                let result = handler(params, cx.clone());
1288                let responded_tx = responded_tx.clone();
1289                let executor = cx.background_executor().clone();
1290                async move {
1291                    executor.simulate_random_delay().await;
1292                    let result = result.await;
1293                    responded_tx.unbounded_send(()).ok();
1294                    result
1295                }
1296            })
1297            .detach();
1298        responded_rx
1299    }
1300
1301    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1302    pub fn handle_notification<T, F>(
1303        &self,
1304        mut handler: F,
1305    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1306    where
1307        T: 'static + notification::Notification,
1308        T::Params: 'static + Send,
1309        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
1310    {
1311        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1312        self.server.remove_notification_handler::<T>();
1313        self.server
1314            .on_notification::<T, _>(move |params, cx| {
1315                handler(params, cx.clone());
1316                handled_tx.unbounded_send(()).ok();
1317            })
1318            .detach();
1319        handled_rx
1320    }
1321
1322    /// Removes any existing handler for specified notification type.
1323    pub fn remove_request_handler<T>(&mut self)
1324    where
1325        T: 'static + request::Request,
1326    {
1327        self.server.remove_request_handler::<T>();
1328    }
1329
1330    /// Simulate that the server has started work and notifies about its progress with the specified token.
1331    pub async fn start_progress(&self, token: impl Into<String>) {
1332        let token = token.into();
1333        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1334            token: NumberOrString::String(token.clone()),
1335        })
1336        .await
1337        .unwrap();
1338        self.notify::<notification::Progress>(ProgressParams {
1339            token: NumberOrString::String(token),
1340            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
1341        });
1342    }
1343
1344    /// Simulate that the server has completed work and notifies about that with the specified token.
1345    pub fn end_progress(&self, token: impl Into<String>) {
1346        self.notify::<notification::Progress>(ProgressParams {
1347            token: NumberOrString::String(token.into()),
1348            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1349        });
1350    }
1351}
1352
1353pub(self) async fn read_headers<Stdout>(
1354    reader: &mut BufReader<Stdout>,
1355    buffer: &mut Vec<u8>,
1356) -> Result<()>
1357where
1358    Stdout: AsyncRead + Unpin + Send + 'static,
1359{
1360    loop {
1361        if buffer.len() >= HEADER_DELIMITER.len()
1362            && buffer[(buffer.len() - HEADER_DELIMITER.len())..] == HEADER_DELIMITER[..]
1363        {
1364            return Ok(());
1365        }
1366
1367        if reader.read_until(b'\n', buffer).await? == 0 {
1368            return Err(anyhow!("cannot read LSP message headers"));
1369        }
1370    }
1371}
1372
1373#[cfg(test)]
1374mod tests {
1375    use super::*;
1376    use gpui::TestAppContext;
1377
1378    #[ctor::ctor]
1379    fn init_logger() {
1380        if std::env::var("RUST_LOG").is_ok() {
1381            env_logger::init();
1382        }
1383    }
1384
1385    #[gpui::test]
1386    async fn test_fake(cx: &mut TestAppContext) {
1387        cx.update(|cx| {
1388            release_channel::init("0.0.0", cx);
1389        });
1390        let (server, mut fake) = FakeLanguageServer::new(
1391            LanguageServerId(0),
1392            LanguageServerBinary {
1393                path: "path/to/language-server".into(),
1394                arguments: vec![],
1395                env: None,
1396            },
1397            "the-lsp".to_string(),
1398            Default::default(),
1399            cx.to_async(),
1400        );
1401
1402        let (message_tx, message_rx) = channel::unbounded();
1403        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1404        server
1405            .on_notification::<notification::ShowMessage, _>(move |params, _| {
1406                message_tx.try_send(params).unwrap()
1407            })
1408            .detach();
1409        server
1410            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1411                diagnostics_tx.try_send(params).unwrap()
1412            })
1413            .detach();
1414
1415        let server = cx.update(|cx| server.initialize(None, cx)).await.unwrap();
1416        server
1417            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1418                text_document: TextDocumentItem::new(
1419                    Url::from_str("file://a/b").unwrap(),
1420                    "rust".to_string(),
1421                    0,
1422                    "".to_string(),
1423                ),
1424            })
1425            .unwrap();
1426        assert_eq!(
1427            fake.receive_notification::<notification::DidOpenTextDocument>()
1428                .await
1429                .text_document
1430                .uri
1431                .as_str(),
1432            "file://a/b"
1433        );
1434
1435        fake.notify::<notification::ShowMessage>(ShowMessageParams {
1436            typ: MessageType::ERROR,
1437            message: "ok".to_string(),
1438        });
1439        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1440            uri: Url::from_str("file://b/c").unwrap(),
1441            version: Some(5),
1442            diagnostics: vec![],
1443        });
1444        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1445        assert_eq!(
1446            diagnostics_rx.recv().await.unwrap().uri.as_str(),
1447            "file://b/c"
1448        );
1449
1450        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1451
1452        drop(server);
1453        fake.receive_notification::<notification::Exit>().await;
1454    }
1455
1456    #[gpui::test]
1457    async fn test_read_headers() {
1458        let mut buf = Vec::new();
1459        let mut reader = smol::io::BufReader::new(b"Content-Length: 123\r\n\r\n" as &[u8]);
1460        read_headers(&mut reader, &mut buf).await.unwrap();
1461        assert_eq!(buf, b"Content-Length: 123\r\n\r\n");
1462
1463        let mut buf = Vec::new();
1464        let mut reader = smol::io::BufReader::new(b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n{\"somecontent\":123}" as &[u8]);
1465        read_headers(&mut reader, &mut buf).await.unwrap();
1466        assert_eq!(
1467            buf,
1468            b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n"
1469        );
1470
1471        let mut buf = Vec::new();
1472        let mut reader = smol::io::BufReader::new(b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n{\"somecontent\":true}" as &[u8]);
1473        read_headers(&mut reader, &mut buf).await.unwrap();
1474        assert_eq!(
1475            buf,
1476            b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n"
1477        );
1478    }
1479
1480    #[gpui::test]
1481    fn test_deserialize_string_digit_id() {
1482        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1483        let notification = serde_json::from_str::<AnyNotification>(json)
1484            .expect("message with string id should be parsed");
1485        let expected_id = RequestId::Str("2".to_string());
1486        assert_eq!(notification.id, Some(expected_id));
1487    }
1488
1489    #[gpui::test]
1490    fn test_deserialize_string_id() {
1491        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1492        let notification = serde_json::from_str::<AnyNotification>(json)
1493            .expect("message with string id should be parsed");
1494        let expected_id = RequestId::Str("anythingAtAll".to_string());
1495        assert_eq!(notification.id, Some(expected_id));
1496    }
1497
1498    #[gpui::test]
1499    fn test_deserialize_int_id() {
1500        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1501        let notification = serde_json::from_str::<AnyNotification>(json)
1502            .expect("message with string id should be parsed");
1503        let expected_id = RequestId::Int(2);
1504        assert_eq!(notification.id, Some(expected_id));
1505    }
1506}