lsp.rs

   1use log::warn;
   2pub use lsp_types::request::*;
   3pub use lsp_types::*;
   4
   5use anyhow::{anyhow, Context, Result};
   6use collections::HashMap;
   7use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite, FutureExt};
   8use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task};
   9use parking_lot::Mutex;
  10use postage::{barrier, prelude::Stream};
  11use serde::{de::DeserializeOwned, Deserialize, Serialize};
  12use serde_json::{json, value::RawValue, Value};
  13use smol::{
  14    channel,
  15    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
  16    process::{self, Child},
  17};
  18use std::{
  19    ffi::OsString,
  20    fmt,
  21    future::Future,
  22    io::Write,
  23    path::PathBuf,
  24    str::{self, FromStr as _},
  25    sync::{
  26        atomic::{AtomicI32, Ordering::SeqCst},
  27        Arc, Weak,
  28    },
  29    time::{Duration, Instant},
  30};
  31use std::{path::Path, process::Stdio};
  32use util::{ResultExt, TryFutureExt};
  33
  34const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n";
  35const JSON_RPC_VERSION: &str = "2.0";
  36const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  37const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
  38
  39type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, &str, AsyncAppContext)>;
  40type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
  41type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  42
  43/// Kind of language server stdio given to an IO handler.
  44#[derive(Debug, Clone, Copy)]
  45pub enum IoKind {
  46    StdOut,
  47    StdIn,
  48    StdErr,
  49}
  50
  51/// Represents a launchable language server. This can either be a standalone binary or the path
  52/// to a runtime with arguments to instruct it to launch the actual language server file.
  53#[derive(Debug, Clone, Deserialize)]
  54pub struct LanguageServerBinary {
  55    pub path: PathBuf,
  56    pub arguments: Vec<OsString>,
  57}
  58
  59/// A running language server process.
  60pub struct LanguageServer {
  61    server_id: LanguageServerId,
  62    next_id: AtomicI32,
  63    outbound_tx: channel::Sender<String>,
  64    name: String,
  65    capabilities: ServerCapabilities,
  66    code_action_kinds: Option<Vec<CodeActionKind>>,
  67    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
  68    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
  69    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
  70    executor: BackgroundExecutor,
  71    #[allow(clippy::type_complexity)]
  72    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
  73    output_done_rx: Mutex<Option<barrier::Receiver>>,
  74    root_path: PathBuf,
  75    _server: Option<Mutex<Child>>,
  76}
  77
  78/// Identifies a running language server.
  79#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
  80#[repr(transparent)]
  81pub struct LanguageServerId(pub usize);
  82
  83/// Handle to a language server RPC activity subscription.
  84pub enum Subscription {
  85    Notification {
  86        method: &'static str,
  87        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
  88    },
  89    Io {
  90        id: i32,
  91        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
  92    },
  93}
  94
  95/// Language server protocol RPC request message ID.
  96///
  97/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
  98#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
  99#[serde(untagged)]
 100pub enum RequestId {
 101    Int(i32),
 102    Str(String),
 103}
 104
 105/// Language server protocol RPC request message.
 106///
 107/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 108#[derive(Serialize, Deserialize)]
 109pub struct Request<'a, T> {
 110    jsonrpc: &'static str,
 111    id: RequestId,
 112    method: &'a str,
 113    params: T,
 114}
 115
 116/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 117#[derive(Serialize, Deserialize)]
 118struct AnyResponse<'a> {
 119    jsonrpc: &'a str,
 120    id: RequestId,
 121    #[serde(default)]
 122    error: Option<Error>,
 123    #[serde(borrow)]
 124    result: Option<&'a RawValue>,
 125}
 126
 127/// Language server protocol RPC request response message.
 128///
 129/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 130#[derive(Serialize)]
 131struct Response<T> {
 132    jsonrpc: &'static str,
 133    id: RequestId,
 134    result: Option<T>,
 135    error: Option<Error>,
 136}
 137
 138/// Language server protocol RPC notification message.
 139///
 140/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 141#[derive(Serialize, Deserialize)]
 142struct Notification<'a, T> {
 143    jsonrpc: &'static str,
 144    #[serde(borrow)]
 145    method: &'a str,
 146    params: T,
 147}
 148
 149/// Language server RPC notification message before it is deserialized into a concrete type.
 150#[derive(Debug, Clone, Deserialize)]
 151struct AnyNotification<'a> {
 152    #[serde(default)]
 153    id: Option<RequestId>,
 154    #[serde(borrow)]
 155    method: &'a str,
 156    #[serde(borrow, default)]
 157    params: Option<&'a RawValue>,
 158}
 159
 160#[derive(Debug, Serialize, Deserialize)]
 161struct Error {
 162    message: String,
 163}
 164
 165impl LanguageServer {
 166    /// Starts a language server process.
 167    pub fn new(
 168        stderr_capture: Arc<Mutex<Option<String>>>,
 169        server_id: LanguageServerId,
 170        binary: LanguageServerBinary,
 171        root_path: &Path,
 172        code_action_kinds: Option<Vec<CodeActionKind>>,
 173        cx: AsyncAppContext,
 174    ) -> Result<Self> {
 175        let working_dir = if root_path.is_dir() {
 176            root_path
 177        } else {
 178            root_path.parent().unwrap_or_else(|| Path::new("/"))
 179        };
 180
 181        let mut server = process::Command::new(&binary.path)
 182            .current_dir(working_dir)
 183            .args(binary.arguments)
 184            .stdin(Stdio::piped())
 185            .stdout(Stdio::piped())
 186            .stderr(Stdio::piped())
 187            .kill_on_drop(true)
 188            .spawn()?;
 189
 190        let stdin = server.stdin.take().unwrap();
 191        let stdout = server.stdout.take().unwrap();
 192        let stderr = server.stderr.take().unwrap();
 193        let mut server = Self::new_internal(
 194            server_id.clone(),
 195            stdin,
 196            stdout,
 197            Some(stderr),
 198            stderr_capture,
 199            Some(server),
 200            root_path,
 201            code_action_kinds,
 202            cx,
 203            move |notification| {
 204                log::info!(
 205                    "{} unhandled notification {}:\n{}",
 206                    server_id,
 207                    notification.method,
 208                    serde_json::to_string_pretty(
 209                        &notification
 210                            .params
 211                            .and_then(|params| Value::from_str(params.get()).ok())
 212                            .unwrap_or(Value::Null)
 213                    )
 214                    .unwrap(),
 215                );
 216            },
 217        );
 218
 219        if let Some(name) = binary.path.file_name() {
 220            server.name = name.to_string_lossy().to_string();
 221        }
 222
 223        Ok(server)
 224    }
 225
 226    fn new_internal<Stdin, Stdout, Stderr, F>(
 227        server_id: LanguageServerId,
 228        stdin: Stdin,
 229        stdout: Stdout,
 230        stderr: Option<Stderr>,
 231        stderr_capture: Arc<Mutex<Option<String>>>,
 232        server: Option<Child>,
 233        root_path: &Path,
 234        code_action_kinds: Option<Vec<CodeActionKind>>,
 235        cx: AsyncAppContext,
 236        on_unhandled_notification: F,
 237    ) -> Self
 238    where
 239        Stdin: AsyncWrite + Unpin + Send + 'static,
 240        Stdout: AsyncRead + Unpin + Send + 'static,
 241        Stderr: AsyncRead + Unpin + Send + 'static,
 242        F: FnMut(AnyNotification) + 'static + Send + Sync + Clone,
 243    {
 244        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 245        let (output_done_tx, output_done_rx) = barrier::channel();
 246        let notification_handlers =
 247            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 248        let response_handlers =
 249            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 250        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 251
 252        let stdout_input_task = cx.spawn({
 253            let on_unhandled_notification = on_unhandled_notification.clone();
 254            let notification_handlers = notification_handlers.clone();
 255            let response_handlers = response_handlers.clone();
 256            let io_handlers = io_handlers.clone();
 257            move |cx| {
 258                Self::handle_input(
 259                    stdout,
 260                    on_unhandled_notification,
 261                    notification_handlers,
 262                    response_handlers,
 263                    io_handlers,
 264                    cx,
 265                )
 266                .log_err()
 267            }
 268        });
 269        let stderr_input_task = stderr
 270            .map(|stderr| {
 271                let io_handlers = io_handlers.clone();
 272                let stderr_captures = stderr_capture.clone();
 273                cx.spawn(|_| Self::handle_stderr(stderr, io_handlers, stderr_captures).log_err())
 274            })
 275            .unwrap_or_else(|| Task::Ready(Some(None)));
 276        let input_task = cx.spawn(|_| async move {
 277            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 278            stdout.or(stderr)
 279        });
 280        let output_task = cx.background_executor().spawn({
 281            Self::handle_output(
 282                stdin,
 283                outbound_rx,
 284                output_done_tx,
 285                response_handlers.clone(),
 286                io_handlers.clone(),
 287            )
 288            .log_err()
 289        });
 290
 291        Self {
 292            server_id,
 293            notification_handlers,
 294            response_handlers,
 295            io_handlers,
 296            name: Default::default(),
 297            capabilities: Default::default(),
 298            code_action_kinds,
 299            next_id: Default::default(),
 300            outbound_tx,
 301            executor: cx.background_executor().clone(),
 302            io_tasks: Mutex::new(Some((input_task, output_task))),
 303            output_done_rx: Mutex::new(Some(output_done_rx)),
 304            root_path: root_path.to_path_buf(),
 305            _server: server.map(|server| Mutex::new(server)),
 306        }
 307    }
 308
 309    /// List of code action kinds this language server reports being able to emit.
 310    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 311        self.code_action_kinds.clone()
 312    }
 313
 314    async fn handle_input<Stdout, F>(
 315        stdout: Stdout,
 316        mut on_unhandled_notification: F,
 317        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 318        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 319        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 320        cx: AsyncAppContext,
 321    ) -> anyhow::Result<()>
 322    where
 323        Stdout: AsyncRead + Unpin + Send + 'static,
 324        F: FnMut(AnyNotification) + 'static + Send,
 325    {
 326        let mut stdout = BufReader::new(stdout);
 327        let _clear_response_handlers = util::defer({
 328            let response_handlers = response_handlers.clone();
 329            move || {
 330                response_handlers.lock().take();
 331            }
 332        });
 333        let mut buffer = Vec::new();
 334        loop {
 335            buffer.clear();
 336
 337            read_headers(&mut stdout, &mut buffer).await?;
 338
 339            let headers = std::str::from_utf8(&buffer)?;
 340
 341            let message_len = headers
 342                .split("\n")
 343                .find(|line| line.starts_with(CONTENT_LEN_HEADER))
 344                .and_then(|line| line.strip_prefix(CONTENT_LEN_HEADER))
 345                .ok_or_else(|| anyhow!("invalid LSP message header {headers:?}"))?
 346                .trim_end()
 347                .parse()?;
 348
 349            buffer.resize(message_len, 0);
 350            stdout.read_exact(&mut buffer).await?;
 351
 352            if let Ok(message) = str::from_utf8(&buffer) {
 353                log::trace!("incoming message: {message}");
 354                for handler in io_handlers.lock().values_mut() {
 355                    handler(IoKind::StdOut, message);
 356                }
 357            }
 358
 359            if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
 360                if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
 361                    handler(
 362                        msg.id,
 363                        msg.params.map(|params| params.get()).unwrap_or("null"),
 364                        cx.clone(),
 365                    );
 366                } else {
 367                    on_unhandled_notification(msg);
 368                }
 369            } else if let Ok(AnyResponse {
 370                id, error, result, ..
 371            }) = serde_json::from_slice(&buffer)
 372            {
 373                if let Some(handler) = response_handlers
 374                    .lock()
 375                    .as_mut()
 376                    .and_then(|handlers| handlers.remove(&id))
 377                {
 378                    if let Some(error) = error {
 379                        handler(Err(error));
 380                    } else if let Some(result) = result {
 381                        handler(Ok(result.get().into()));
 382                    } else {
 383                        handler(Ok("null".into()));
 384                    }
 385                }
 386            } else {
 387                warn!(
 388                    "failed to deserialize LSP message:\n{}",
 389                    std::str::from_utf8(&buffer)?
 390                );
 391            }
 392
 393            // Don't starve the main thread when receiving lots of messages at once.
 394            smol::future::yield_now().await;
 395        }
 396    }
 397
 398    async fn handle_stderr<Stderr>(
 399        stderr: Stderr,
 400        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 401        stderr_capture: Arc<Mutex<Option<String>>>,
 402    ) -> anyhow::Result<()>
 403    where
 404        Stderr: AsyncRead + Unpin + Send + 'static,
 405    {
 406        let mut stderr = BufReader::new(stderr);
 407        let mut buffer = Vec::new();
 408
 409        loop {
 410            buffer.clear();
 411
 412            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 413            if bytes_read == 0 {
 414                return Ok(());
 415            }
 416
 417            if let Ok(message) = str::from_utf8(&buffer) {
 418                log::trace!("incoming stderr message:{message}");
 419                for handler in io_handlers.lock().values_mut() {
 420                    handler(IoKind::StdErr, message);
 421                }
 422
 423                if let Some(stderr) = stderr_capture.lock().as_mut() {
 424                    stderr.push_str(message);
 425                }
 426            }
 427
 428            // Don't starve the main thread when receiving lots of messages at once.
 429            smol::future::yield_now().await;
 430        }
 431    }
 432
 433    async fn handle_output<Stdin>(
 434        stdin: Stdin,
 435        outbound_rx: channel::Receiver<String>,
 436        output_done_tx: barrier::Sender,
 437        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 438        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 439    ) -> anyhow::Result<()>
 440    where
 441        Stdin: AsyncWrite + Unpin + Send + 'static,
 442    {
 443        let mut stdin = BufWriter::new(stdin);
 444        let _clear_response_handlers = util::defer({
 445            let response_handlers = response_handlers.clone();
 446            move || {
 447                response_handlers.lock().take();
 448            }
 449        });
 450        let mut content_len_buffer = Vec::new();
 451        while let Ok(message) = outbound_rx.recv().await {
 452            log::trace!("outgoing message:{}", message);
 453            for handler in io_handlers.lock().values_mut() {
 454                handler(IoKind::StdIn, &message);
 455            }
 456
 457            content_len_buffer.clear();
 458            write!(content_len_buffer, "{}", message.len()).unwrap();
 459            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 460            stdin.write_all(&content_len_buffer).await?;
 461            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 462            stdin.write_all(message.as_bytes()).await?;
 463            stdin.flush().await?;
 464        }
 465        drop(output_done_tx);
 466        Ok(())
 467    }
 468
 469    /// Initializes a language server by sending the `Initialize` request.
 470    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
 471    ///
 472    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
 473    pub fn initialize(
 474        mut self,
 475        options: Option<Value>,
 476        cx: &AppContext,
 477    ) -> Task<Result<Arc<Self>>> {
 478        let root_uri = Url::from_file_path(&self.root_path).unwrap();
 479        #[allow(deprecated)]
 480        let params = InitializeParams {
 481            process_id: None,
 482            root_path: None,
 483            root_uri: Some(root_uri.clone()),
 484            initialization_options: options,
 485            capabilities: ClientCapabilities {
 486                workspace: Some(WorkspaceClientCapabilities {
 487                    configuration: Some(true),
 488                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 489                        dynamic_registration: Some(true),
 490                        relative_pattern_support: Some(true),
 491                    }),
 492                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 493                        dynamic_registration: Some(true),
 494                    }),
 495                    workspace_folders: Some(true),
 496                    symbol: Some(WorkspaceSymbolClientCapabilities {
 497                        resolve_support: None,
 498                        ..WorkspaceSymbolClientCapabilities::default()
 499                    }),
 500                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 501                        refresh_support: Some(true),
 502                    }),
 503                    diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
 504                        refresh_support: None,
 505                    }),
 506                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 507                        resource_operations: Some(vec![
 508                            ResourceOperationKind::Create,
 509                            ResourceOperationKind::Rename,
 510                            ResourceOperationKind::Delete,
 511                        ]),
 512                        document_changes: Some(true),
 513                        ..WorkspaceEditClientCapabilities::default()
 514                    }),
 515                    ..Default::default()
 516                }),
 517                text_document: Some(TextDocumentClientCapabilities {
 518                    definition: Some(GotoCapability {
 519                        link_support: Some(true),
 520                        dynamic_registration: None,
 521                    }),
 522                    code_action: Some(CodeActionClientCapabilities {
 523                        code_action_literal_support: Some(CodeActionLiteralSupport {
 524                            code_action_kind: CodeActionKindLiteralSupport {
 525                                value_set: vec![
 526                                    CodeActionKind::REFACTOR.as_str().into(),
 527                                    CodeActionKind::QUICKFIX.as_str().into(),
 528                                    CodeActionKind::SOURCE.as_str().into(),
 529                                ],
 530                            },
 531                        }),
 532                        data_support: Some(true),
 533                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 534                            properties: vec!["edit".to_string(), "command".to_string()],
 535                        }),
 536                        ..Default::default()
 537                    }),
 538                    completion: Some(CompletionClientCapabilities {
 539                        completion_item: Some(CompletionItemCapability {
 540                            snippet_support: Some(true),
 541                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 542                                properties: vec![
 543                                    "documentation".to_string(),
 544                                    "additionalTextEdits".to_string(),
 545                                ],
 546                            }),
 547                            ..Default::default()
 548                        }),
 549                        completion_list: Some(CompletionListCapability {
 550                            item_defaults: Some(vec![
 551                                "commitCharacters".to_owned(),
 552                                "editRange".to_owned(),
 553                                "insertTextMode".to_owned(),
 554                                "data".to_owned(),
 555                            ]),
 556                        }),
 557                        ..Default::default()
 558                    }),
 559                    rename: Some(RenameClientCapabilities {
 560                        prepare_support: Some(true),
 561                        ..Default::default()
 562                    }),
 563                    hover: Some(HoverClientCapabilities {
 564                        content_format: Some(vec![MarkupKind::Markdown]),
 565                        dynamic_registration: None,
 566                    }),
 567                    inlay_hint: Some(InlayHintClientCapabilities {
 568                        resolve_support: Some(InlayHintResolveClientCapabilities {
 569                            properties: vec![
 570                                "textEdits".to_string(),
 571                                "tooltip".to_string(),
 572                                "label.tooltip".to_string(),
 573                                "label.location".to_string(),
 574                                "label.command".to_string(),
 575                            ],
 576                        }),
 577                        dynamic_registration: Some(false),
 578                    }),
 579                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 580                        related_information: Some(true),
 581                        ..Default::default()
 582                    }),
 583                    formatting: Some(DynamicRegistrationClientCapabilities {
 584                        dynamic_registration: None,
 585                    }),
 586                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 587                        dynamic_registration: None,
 588                    }),
 589                    diagnostic: Some(DiagnosticClientCapabilities {
 590                        related_document_support: Some(true),
 591                        dynamic_registration: None,
 592                    }),
 593                    ..Default::default()
 594                }),
 595                experimental: Some(json!({
 596                    "serverStatusNotification": true,
 597                })),
 598                window: Some(WindowClientCapabilities {
 599                    work_done_progress: Some(true),
 600                    ..Default::default()
 601                }),
 602                general: None,
 603            },
 604            trace: None,
 605            workspace_folders: Some(vec![WorkspaceFolder {
 606                uri: root_uri,
 607                name: Default::default(),
 608            }]),
 609            client_info: Some(ClientInfo {
 610                name: release_channel::ReleaseChannel::global(cx)
 611                    .display_name()
 612                    .to_string(),
 613                version: Some(release_channel::AppVersion::global(cx).to_string()),
 614            }),
 615            locale: None,
 616        };
 617
 618        cx.spawn(|_| async move {
 619            let response = self.request::<request::Initialize>(params).await?;
 620            if let Some(info) = response.server_info {
 621                self.name = info.name;
 622            }
 623            self.capabilities = response.capabilities;
 624
 625            self.notify::<notification::Initialized>(InitializedParams {})?;
 626            Ok(Arc::new(self))
 627        })
 628    }
 629
 630    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
 631    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
 632        if let Some(tasks) = self.io_tasks.lock().take() {
 633            let response_handlers = self.response_handlers.clone();
 634            let next_id = AtomicI32::new(self.next_id.load(SeqCst));
 635            let outbound_tx = self.outbound_tx.clone();
 636            let executor = self.executor.clone();
 637            let mut output_done = self.output_done_rx.lock().take().unwrap();
 638            let shutdown_request = Self::request_internal::<request::Shutdown>(
 639                &next_id,
 640                &response_handlers,
 641                &outbound_tx,
 642                &executor,
 643                (),
 644            );
 645            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
 646            outbound_tx.close();
 647            Some(
 648                async move {
 649                    log::debug!("language server shutdown started");
 650                    shutdown_request.await?;
 651                    response_handlers.lock().take();
 652                    exit?;
 653                    output_done.recv().await;
 654                    log::debug!("language server shutdown finished");
 655                    drop(tasks);
 656                    anyhow::Ok(())
 657                }
 658                .log_err(),
 659            )
 660        } else {
 661            None
 662        }
 663    }
 664
 665    /// Register a handler to handle incoming LSP notifications.
 666    ///
 667    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 668    #[must_use]
 669    pub fn on_notification<T, F>(&self, f: F) -> Subscription
 670    where
 671        T: notification::Notification,
 672        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
 673    {
 674        self.on_custom_notification(T::METHOD, f)
 675    }
 676
 677    /// Register a handler to handle incoming LSP requests.
 678    ///
 679    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 680    #[must_use]
 681    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
 682    where
 683        T: request::Request,
 684        T::Params: 'static + Send,
 685        F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
 686        Fut: 'static + Future<Output = Result<T::Result>>,
 687    {
 688        self.on_custom_request(T::METHOD, f)
 689    }
 690
 691    /// Registers a handler to inspect all language server process stdio.
 692    #[must_use]
 693    pub fn on_io<F>(&self, f: F) -> Subscription
 694    where
 695        F: 'static + Send + FnMut(IoKind, &str),
 696    {
 697        let id = self.next_id.fetch_add(1, SeqCst);
 698        self.io_handlers.lock().insert(id, Box::new(f));
 699        Subscription::Io {
 700            id,
 701            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
 702        }
 703    }
 704
 705    /// Removes a request handler registers via [`Self::on_request`].
 706    pub fn remove_request_handler<T: request::Request>(&self) {
 707        self.notification_handlers.lock().remove(T::METHOD);
 708    }
 709
 710    /// Removes a notification handler registers via [`Self::on_notification`].
 711    pub fn remove_notification_handler<T: notification::Notification>(&self) {
 712        self.notification_handlers.lock().remove(T::METHOD);
 713    }
 714
 715    /// Checks if a notification handler has been registered via [`Self::on_notification`].
 716    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
 717        self.notification_handlers.lock().contains_key(T::METHOD)
 718    }
 719
 720    #[must_use]
 721    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
 722    where
 723        F: 'static + FnMut(Params, AsyncAppContext) + Send,
 724        Params: DeserializeOwned,
 725    {
 726        let prev_handler = self.notification_handlers.lock().insert(
 727            method,
 728            Box::new(move |_, params, cx| {
 729                if let Some(params) = serde_json::from_str(params).log_err() {
 730                    f(params, cx);
 731                }
 732            }),
 733        );
 734        assert!(
 735            prev_handler.is_none(),
 736            "registered multiple handlers for the same LSP method"
 737        );
 738        Subscription::Notification {
 739            method,
 740            notification_handlers: Some(self.notification_handlers.clone()),
 741        }
 742    }
 743
 744    #[must_use]
 745    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
 746    where
 747        F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
 748        Fut: 'static + Future<Output = Result<Res>>,
 749        Params: DeserializeOwned + Send + 'static,
 750        Res: Serialize,
 751    {
 752        let outbound_tx = self.outbound_tx.clone();
 753        let prev_handler = self.notification_handlers.lock().insert(
 754            method,
 755            Box::new(move |id, params, cx| {
 756                if let Some(id) = id {
 757                    match serde_json::from_str(params) {
 758                        Ok(params) => {
 759                            let response = f(params, cx.clone());
 760                            cx.foreground_executor()
 761                                .spawn({
 762                                    let outbound_tx = outbound_tx.clone();
 763                                    async move {
 764                                        let response = match response.await {
 765                                            Ok(result) => Response {
 766                                                jsonrpc: JSON_RPC_VERSION,
 767                                                id,
 768                                                result: Some(result),
 769                                                error: None,
 770                                            },
 771                                            Err(error) => Response {
 772                                                jsonrpc: JSON_RPC_VERSION,
 773                                                id,
 774                                                result: None,
 775                                                error: Some(Error {
 776                                                    message: error.to_string(),
 777                                                }),
 778                                            },
 779                                        };
 780                                        if let Some(response) =
 781                                            serde_json::to_string(&response).log_err()
 782                                        {
 783                                            outbound_tx.try_send(response).ok();
 784                                        }
 785                                    }
 786                                })
 787                                .detach();
 788                        }
 789
 790                        Err(error) => {
 791                            log::error!(
 792                                "error deserializing {} request: {:?}, message: {:?}",
 793                                method,
 794                                error,
 795                                params
 796                            );
 797                            let response = AnyResponse {
 798                                jsonrpc: JSON_RPC_VERSION,
 799                                id,
 800                                result: None,
 801                                error: Some(Error {
 802                                    message: error.to_string(),
 803                                }),
 804                            };
 805                            if let Some(response) = serde_json::to_string(&response).log_err() {
 806                                outbound_tx.try_send(response).ok();
 807                            }
 808                        }
 809                    }
 810                }
 811            }),
 812        );
 813        assert!(
 814            prev_handler.is_none(),
 815            "registered multiple handlers for the same LSP method"
 816        );
 817        Subscription::Notification {
 818            method,
 819            notification_handlers: Some(self.notification_handlers.clone()),
 820        }
 821    }
 822
 823    /// Get the name of the running language server.
 824    pub fn name(&self) -> &str {
 825        &self.name
 826    }
 827
 828    /// Get the reported capabilities of the running language server.
 829    pub fn capabilities(&self) -> &ServerCapabilities {
 830        &self.capabilities
 831    }
 832
 833    /// Get the id of the running language server.
 834    pub fn server_id(&self) -> LanguageServerId {
 835        self.server_id
 836    }
 837
 838    /// Get the root path of the project the language server is running against.
 839    pub fn root_path(&self) -> &PathBuf {
 840        &self.root_path
 841    }
 842
 843    /// Sends a RPC request to the language server.
 844    ///
 845    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 846    pub fn request<T: request::Request>(
 847        &self,
 848        params: T::Params,
 849    ) -> impl Future<Output = Result<T::Result>>
 850    where
 851        T::Result: 'static + Send,
 852    {
 853        Self::request_internal::<T>(
 854            &self.next_id,
 855            &self.response_handlers,
 856            &self.outbound_tx,
 857            &self.executor,
 858            params,
 859        )
 860    }
 861
 862    fn request_internal<T: request::Request>(
 863        next_id: &AtomicI32,
 864        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
 865        outbound_tx: &channel::Sender<String>,
 866        executor: &BackgroundExecutor,
 867        params: T::Params,
 868    ) -> impl 'static + Future<Output = anyhow::Result<T::Result>>
 869    where
 870        T::Result: 'static + Send,
 871    {
 872        let id = next_id.fetch_add(1, SeqCst);
 873        let message = serde_json::to_string(&Request {
 874            jsonrpc: JSON_RPC_VERSION,
 875            id: RequestId::Int(id),
 876            method: T::METHOD,
 877            params,
 878        })
 879        .unwrap();
 880
 881        let (tx, rx) = oneshot::channel();
 882        let handle_response = response_handlers
 883            .lock()
 884            .as_mut()
 885            .ok_or_else(|| anyhow!("server shut down"))
 886            .map(|handlers| {
 887                let executor = executor.clone();
 888                handlers.insert(
 889                    RequestId::Int(id),
 890                    Box::new(move |result| {
 891                        executor
 892                            .spawn(async move {
 893                                let response = match result {
 894                                    Ok(response) => serde_json::from_str(&response)
 895                                        .context("failed to deserialize response"),
 896                                    Err(error) => Err(anyhow!("{}", error.message)),
 897                                };
 898                                _ = tx.send(response);
 899                            })
 900                            .detach();
 901                    }),
 902                );
 903            });
 904
 905        let send = outbound_tx
 906            .try_send(message)
 907            .context("failed to write to language server's stdin");
 908
 909        let outbound_tx = outbound_tx.downgrade();
 910        let mut timeout = executor.timer(LSP_REQUEST_TIMEOUT).fuse();
 911        let started = Instant::now();
 912        async move {
 913            handle_response?;
 914            send?;
 915
 916            let cancel_on_drop = util::defer(move || {
 917                if let Some(outbound_tx) = outbound_tx.upgrade() {
 918                    Self::notify_internal::<notification::Cancel>(
 919                        &outbound_tx,
 920                        CancelParams {
 921                            id: NumberOrString::Number(id as i32),
 922                        },
 923                    )
 924                    .log_err();
 925                }
 926            });
 927
 928            let method = T::METHOD;
 929            futures::select! {
 930                response = rx.fuse() => {
 931                    let elapsed = started.elapsed();
 932                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
 933                    cancel_on_drop.abort();
 934                    response?
 935                }
 936
 937                _ = timeout => {
 938                    log::error!("Cancelled LSP request task for {method:?} id {id} which took over {LSP_REQUEST_TIMEOUT:?}");
 939                    anyhow::bail!("LSP request timeout");
 940                }
 941            }
 942        }
 943    }
 944
 945    /// Sends a RPC notification to the language server.
 946    ///
 947    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 948    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
 949        Self::notify_internal::<T>(&self.outbound_tx, params)
 950    }
 951
 952    fn notify_internal<T: notification::Notification>(
 953        outbound_tx: &channel::Sender<String>,
 954        params: T::Params,
 955    ) -> Result<()> {
 956        let message = serde_json::to_string(&Notification {
 957            jsonrpc: JSON_RPC_VERSION,
 958            method: T::METHOD,
 959            params,
 960        })
 961        .unwrap();
 962        outbound_tx.try_send(message)?;
 963        Ok(())
 964    }
 965}
 966
 967impl Drop for LanguageServer {
 968    fn drop(&mut self) {
 969        if let Some(shutdown) = self.shutdown() {
 970            self.executor.spawn(shutdown).detach();
 971        }
 972    }
 973}
 974
 975impl Subscription {
 976    /// Detaching a subscription handle prevents it from unsubscribing on drop.
 977    pub fn detach(&mut self) {
 978        match self {
 979            Subscription::Notification {
 980                notification_handlers,
 981                ..
 982            } => *notification_handlers = None,
 983            Subscription::Io { io_handlers, .. } => *io_handlers = None,
 984        }
 985    }
 986}
 987
 988impl fmt::Display for LanguageServerId {
 989    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 990        self.0.fmt(f)
 991    }
 992}
 993
 994impl fmt::Debug for LanguageServer {
 995    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 996        f.debug_struct("LanguageServer")
 997            .field("id", &self.server_id.0)
 998            .field("name", &self.name)
 999            .finish_non_exhaustive()
1000    }
1001}
1002
1003impl Drop for Subscription {
1004    fn drop(&mut self) {
1005        match self {
1006            Subscription::Notification {
1007                method,
1008                notification_handlers,
1009            } => {
1010                if let Some(handlers) = notification_handlers {
1011                    handlers.lock().remove(method);
1012                }
1013            }
1014            Subscription::Io { id, io_handlers } => {
1015                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1016                    io_handlers.lock().remove(id);
1017                }
1018            }
1019        }
1020    }
1021}
1022
1023/// Mock language server for use in tests.
1024#[cfg(any(test, feature = "test-support"))]
1025#[derive(Clone)]
1026pub struct FakeLanguageServer {
1027    pub server: Arc<LanguageServer>,
1028    notifications_rx: channel::Receiver<(String, String)>,
1029}
1030
1031#[cfg(any(test, feature = "test-support"))]
1032impl FakeLanguageServer {
1033    /// Construct a fake language server.
1034    pub fn new(
1035        name: String,
1036        capabilities: ServerCapabilities,
1037        cx: AsyncAppContext,
1038    ) -> (LanguageServer, FakeLanguageServer) {
1039        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1040        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1041        let (notifications_tx, notifications_rx) = channel::unbounded();
1042
1043        let server = LanguageServer::new_internal(
1044            LanguageServerId(0),
1045            stdin_writer,
1046            stdout_reader,
1047            None::<async_pipe::PipeReader>,
1048            Arc::new(Mutex::new(None)),
1049            None,
1050            Path::new("/"),
1051            None,
1052            cx.clone(),
1053            |_| {},
1054        );
1055        let fake = FakeLanguageServer {
1056            server: Arc::new(LanguageServer::new_internal(
1057                LanguageServerId(0),
1058                stdout_writer,
1059                stdin_reader,
1060                None::<async_pipe::PipeReader>,
1061                Arc::new(Mutex::new(None)),
1062                None,
1063                Path::new("/"),
1064                None,
1065                cx,
1066                move |msg| {
1067                    notifications_tx
1068                        .try_send((
1069                            msg.method.to_string(),
1070                            msg.params
1071                                .map(|raw_value| raw_value.get())
1072                                .unwrap_or("null")
1073                                .to_string(),
1074                        ))
1075                        .ok();
1076                },
1077            )),
1078            notifications_rx,
1079        };
1080        fake.handle_request::<request::Initialize, _, _>({
1081            let capabilities = capabilities;
1082            move |_, _| {
1083                let capabilities = capabilities.clone();
1084                let name = name.clone();
1085                async move {
1086                    Ok(InitializeResult {
1087                        capabilities,
1088                        server_info: Some(ServerInfo {
1089                            name,
1090                            ..Default::default()
1091                        }),
1092                    })
1093                }
1094            }
1095        });
1096
1097        (server, fake)
1098    }
1099}
1100
1101#[cfg(any(test, feature = "test-support"))]
1102impl LanguageServer {
1103    pub fn full_capabilities() -> ServerCapabilities {
1104        ServerCapabilities {
1105            document_highlight_provider: Some(OneOf::Left(true)),
1106            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1107            document_formatting_provider: Some(OneOf::Left(true)),
1108            document_range_formatting_provider: Some(OneOf::Left(true)),
1109            definition_provider: Some(OneOf::Left(true)),
1110            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1111            ..Default::default()
1112        }
1113    }
1114}
1115
1116#[cfg(any(test, feature = "test-support"))]
1117impl FakeLanguageServer {
1118    /// See [`LanguageServer::notify`].
1119    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1120        self.server.notify::<T>(params).ok();
1121    }
1122
1123    /// See [`LanguageServer::request`].
1124    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
1125    where
1126        T: request::Request,
1127        T::Result: 'static + Send,
1128    {
1129        self.server.executor.start_waiting();
1130        self.server.request::<T>(params).await
1131    }
1132
1133    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1134    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1135        self.server.executor.start_waiting();
1136        self.try_receive_notification::<T>().await.unwrap()
1137    }
1138
1139    /// Consumes the notification channel until it finds a notification for the specified type.
1140    pub async fn try_receive_notification<T: notification::Notification>(
1141        &mut self,
1142    ) -> Option<T::Params> {
1143        use futures::StreamExt as _;
1144
1145        loop {
1146            let (method, params) = self.notifications_rx.next().await?;
1147            if method == T::METHOD {
1148                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1149            } else {
1150                log::info!("skipping message in fake language server {:?}", params);
1151            }
1152        }
1153    }
1154
1155    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1156    pub fn handle_request<T, F, Fut>(
1157        &self,
1158        mut handler: F,
1159    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1160    where
1161        T: 'static + request::Request,
1162        T::Params: 'static + Send,
1163        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
1164        Fut: 'static + Send + Future<Output = Result<T::Result>>,
1165    {
1166        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1167        self.server.remove_request_handler::<T>();
1168        self.server
1169            .on_request::<T, _, _>(move |params, cx| {
1170                let result = handler(params, cx.clone());
1171                let responded_tx = responded_tx.clone();
1172                let executor = cx.background_executor().clone();
1173                async move {
1174                    executor.simulate_random_delay().await;
1175                    let result = result.await;
1176                    responded_tx.unbounded_send(()).ok();
1177                    result
1178                }
1179            })
1180            .detach();
1181        responded_rx
1182    }
1183
1184    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1185    pub fn handle_notification<T, F>(
1186        &self,
1187        mut handler: F,
1188    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1189    where
1190        T: 'static + notification::Notification,
1191        T::Params: 'static + Send,
1192        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
1193    {
1194        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1195        self.server.remove_notification_handler::<T>();
1196        self.server
1197            .on_notification::<T, _>(move |params, cx| {
1198                handler(params, cx.clone());
1199                handled_tx.unbounded_send(()).ok();
1200            })
1201            .detach();
1202        handled_rx
1203    }
1204
1205    /// Removes any existing handler for specified notification type.
1206    pub fn remove_request_handler<T>(&mut self)
1207    where
1208        T: 'static + request::Request,
1209    {
1210        self.server.remove_request_handler::<T>();
1211    }
1212
1213    /// Simulate that the server has started work and notifies about its progress with the specified token.
1214    pub async fn start_progress(&self, token: impl Into<String>) {
1215        let token = token.into();
1216        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1217            token: NumberOrString::String(token.clone()),
1218        })
1219        .await
1220        .unwrap();
1221        self.notify::<notification::Progress>(ProgressParams {
1222            token: NumberOrString::String(token),
1223            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
1224        });
1225    }
1226
1227    /// Simulate that the server has completed work and notifies about that with the specified token.
1228    pub fn end_progress(&self, token: impl Into<String>) {
1229        self.notify::<notification::Progress>(ProgressParams {
1230            token: NumberOrString::String(token.into()),
1231            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1232        });
1233    }
1234}
1235
1236pub(self) async fn read_headers<Stdout>(
1237    reader: &mut BufReader<Stdout>,
1238    buffer: &mut Vec<u8>,
1239) -> Result<()>
1240where
1241    Stdout: AsyncRead + Unpin + Send + 'static,
1242{
1243    loop {
1244        if buffer.len() >= HEADER_DELIMITER.len()
1245            && buffer[(buffer.len() - HEADER_DELIMITER.len())..] == HEADER_DELIMITER[..]
1246        {
1247            return Ok(());
1248        }
1249
1250        if reader.read_until(b'\n', buffer).await? == 0 {
1251            return Err(anyhow!("cannot read LSP message headers"));
1252        }
1253    }
1254}
1255
1256#[cfg(test)]
1257mod tests {
1258    use super::*;
1259    use gpui::TestAppContext;
1260
1261    #[ctor::ctor]
1262    fn init_logger() {
1263        if std::env::var("RUST_LOG").is_ok() {
1264            env_logger::init();
1265        }
1266    }
1267
1268    #[gpui::test]
1269    async fn test_fake(cx: &mut TestAppContext) {
1270        cx.update(|cx| {
1271            release_channel::init("0.0.0", cx);
1272        });
1273        let (server, mut fake) =
1274            FakeLanguageServer::new("the-lsp".to_string(), Default::default(), cx.to_async());
1275
1276        let (message_tx, message_rx) = channel::unbounded();
1277        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1278        server
1279            .on_notification::<notification::ShowMessage, _>(move |params, _| {
1280                message_tx.try_send(params).unwrap()
1281            })
1282            .detach();
1283        server
1284            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1285                diagnostics_tx.try_send(params).unwrap()
1286            })
1287            .detach();
1288
1289        let server = cx.update(|cx| server.initialize(None, cx)).await.unwrap();
1290        server
1291            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1292                text_document: TextDocumentItem::new(
1293                    Url::from_str("file://a/b").unwrap(),
1294                    "rust".to_string(),
1295                    0,
1296                    "".to_string(),
1297                ),
1298            })
1299            .unwrap();
1300        assert_eq!(
1301            fake.receive_notification::<notification::DidOpenTextDocument>()
1302                .await
1303                .text_document
1304                .uri
1305                .as_str(),
1306            "file://a/b"
1307        );
1308
1309        fake.notify::<notification::ShowMessage>(ShowMessageParams {
1310            typ: MessageType::ERROR,
1311            message: "ok".to_string(),
1312        });
1313        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1314            uri: Url::from_str("file://b/c").unwrap(),
1315            version: Some(5),
1316            diagnostics: vec![],
1317        });
1318        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1319        assert_eq!(
1320            diagnostics_rx.recv().await.unwrap().uri.as_str(),
1321            "file://b/c"
1322        );
1323
1324        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1325
1326        drop(server);
1327        fake.receive_notification::<notification::Exit>().await;
1328    }
1329
1330    #[gpui::test]
1331    async fn test_read_headers() {
1332        let mut buf = Vec::new();
1333        let mut reader = smol::io::BufReader::new(b"Content-Length: 123\r\n\r\n" as &[u8]);
1334        read_headers(&mut reader, &mut buf).await.unwrap();
1335        assert_eq!(buf, b"Content-Length: 123\r\n\r\n");
1336
1337        let mut buf = Vec::new();
1338        let mut reader = smol::io::BufReader::new(b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n{\"somecontent\":123}" as &[u8]);
1339        read_headers(&mut reader, &mut buf).await.unwrap();
1340        assert_eq!(
1341            buf,
1342            b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n"
1343        );
1344
1345        let mut buf = Vec::new();
1346        let mut reader = smol::io::BufReader::new(b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n{\"somecontent\":true}" as &[u8]);
1347        read_headers(&mut reader, &mut buf).await.unwrap();
1348        assert_eq!(
1349            buf,
1350            b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n"
1351        );
1352    }
1353
1354    #[gpui::test]
1355    fn test_deserialize_string_digit_id() {
1356        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1357        let notification = serde_json::from_str::<AnyNotification>(json)
1358            .expect("message with string id should be parsed");
1359        let expected_id = RequestId::Str("2".to_string());
1360        assert_eq!(notification.id, Some(expected_id));
1361    }
1362
1363    #[gpui::test]
1364    fn test_deserialize_string_id() {
1365        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1366        let notification = serde_json::from_str::<AnyNotification>(json)
1367            .expect("message with string id should be parsed");
1368        let expected_id = RequestId::Str("anythingAtAll".to_string());
1369        assert_eq!(notification.id, Some(expected_id));
1370    }
1371
1372    #[gpui::test]
1373    fn test_deserialize_int_id() {
1374        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1375        let notification = serde_json::from_str::<AnyNotification>(json)
1376            .expect("message with string id should be parsed");
1377        let expected_id = RequestId::Int(2);
1378        assert_eq!(notification.id, Some(expected_id));
1379    }
1380}