lsp.rs

   1use log::warn;
   2pub use lsp_types::request::*;
   3pub use lsp_types::*;
   4
   5use anyhow::{anyhow, Context, Result};
   6use collections::HashMap;
   7use futures::{channel::oneshot, io::BufWriter, select, AsyncRead, AsyncWrite, FutureExt};
   8use gpui::{AppContext, AsyncAppContext, BackgroundExecutor, Task};
   9use parking_lot::Mutex;
  10use postage::{barrier, prelude::Stream};
  11use serde::{de::DeserializeOwned, Deserialize, Serialize};
  12use serde_json::{json, value::RawValue, Value};
  13use smol::{
  14    channel,
  15    io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
  16    process::{self, Child},
  17};
  18
  19#[cfg(target_os = "windows")]
  20use smol::process::windows::CommandExt;
  21
  22use std::{
  23    ffi::OsString,
  24    fmt,
  25    future::Future,
  26    io::Write,
  27    path::PathBuf,
  28    str::{self, FromStr as _},
  29    sync::{
  30        atomic::{AtomicI32, Ordering::SeqCst},
  31        Arc, Weak,
  32    },
  33    time::{Duration, Instant},
  34};
  35use std::{path::Path, process::Stdio};
  36use util::{ResultExt, TryFutureExt};
  37
  38const HEADER_DELIMITER: &'static [u8; 4] = b"\r\n\r\n";
  39const JSON_RPC_VERSION: &str = "2.0";
  40const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  41const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
  42const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  43
  44type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, &str, AsyncAppContext)>;
  45type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
  46type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  47
  48/// Kind of language server stdio given to an IO handler.
  49#[derive(Debug, Clone, Copy)]
  50pub enum IoKind {
  51    StdOut,
  52    StdIn,
  53    StdErr,
  54}
  55
  56/// Represents a launchable language server. This can either be a standalone binary or the path
  57/// to a runtime with arguments to instruct it to launch the actual language server file.
  58#[derive(Debug, Clone, Deserialize)]
  59pub struct LanguageServerBinary {
  60    pub path: PathBuf,
  61    pub arguments: Vec<OsString>,
  62    pub env: Option<HashMap<String, String>>,
  63}
  64
  65/// A running language server process.
  66pub struct LanguageServer {
  67    server_id: LanguageServerId,
  68    next_id: AtomicI32,
  69    outbound_tx: channel::Sender<String>,
  70    name: Arc<str>,
  71    capabilities: ServerCapabilities,
  72    code_action_kinds: Option<Vec<CodeActionKind>>,
  73    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
  74    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
  75    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
  76    executor: BackgroundExecutor,
  77    #[allow(clippy::type_complexity)]
  78    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
  79    output_done_rx: Mutex<Option<barrier::Receiver>>,
  80    root_path: PathBuf,
  81    server: Arc<Mutex<Option<Child>>>,
  82}
  83
  84/// Identifies a running language server.
  85#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
  86#[repr(transparent)]
  87pub struct LanguageServerId(pub usize);
  88
  89/// Handle to a language server RPC activity subscription.
  90pub enum Subscription {
  91    Notification {
  92        method: &'static str,
  93        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
  94    },
  95    Io {
  96        id: i32,
  97        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
  98    },
  99}
 100
 101/// Language server protocol RPC request message ID.
 102///
 103/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 104#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 105#[serde(untagged)]
 106pub enum RequestId {
 107    Int(i32),
 108    Str(String),
 109}
 110
 111/// Language server protocol RPC request message.
 112///
 113/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 114#[derive(Serialize, Deserialize)]
 115pub struct Request<'a, T> {
 116    jsonrpc: &'static str,
 117    id: RequestId,
 118    method: &'a str,
 119    params: T,
 120}
 121
 122/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 123#[derive(Serialize, Deserialize)]
 124struct AnyResponse<'a> {
 125    jsonrpc: &'a str,
 126    id: RequestId,
 127    #[serde(default)]
 128    error: Option<Error>,
 129    #[serde(borrow)]
 130    result: Option<&'a RawValue>,
 131}
 132
 133/// Language server protocol RPC request response message.
 134///
 135/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 136#[derive(Serialize)]
 137struct Response<T> {
 138    jsonrpc: &'static str,
 139    id: RequestId,
 140    result: Option<T>,
 141    error: Option<Error>,
 142}
 143
 144/// Language server protocol RPC notification message.
 145///
 146/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 147#[derive(Serialize, Deserialize)]
 148struct Notification<'a, T> {
 149    jsonrpc: &'static str,
 150    #[serde(borrow)]
 151    method: &'a str,
 152    params: T,
 153}
 154
 155/// Language server RPC notification message before it is deserialized into a concrete type.
 156#[derive(Debug, Clone, Deserialize)]
 157struct AnyNotification<'a> {
 158    #[serde(default)]
 159    id: Option<RequestId>,
 160    #[serde(borrow)]
 161    method: &'a str,
 162    #[serde(borrow, default)]
 163    params: Option<&'a RawValue>,
 164}
 165
 166#[derive(Debug, Serialize, Deserialize)]
 167struct Error {
 168    message: String,
 169}
 170
 171/// Experimental: Informs the end user about the state of the server
 172///
 173/// [Rust Analyzer Specification](https://github.com/rust-lang/rust-analyzer/blob/master/docs/dev/lsp-extensions.md#server-status)
 174#[derive(Debug)]
 175pub enum ServerStatus {}
 176
 177/// Other(String) variant to handle unknown values due to this still being experimental
 178#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
 179#[serde(rename_all = "camelCase")]
 180pub enum ServerHealthStatus {
 181    Ok,
 182    Warning,
 183    Error,
 184    Other(String),
 185}
 186
 187#[derive(Debug, PartialEq, Deserialize, Serialize, Clone)]
 188#[serde(rename_all = "camelCase")]
 189pub struct ServerStatusParams {
 190    pub health: ServerHealthStatus,
 191    pub message: Option<String>,
 192}
 193
 194impl lsp_types::notification::Notification for ServerStatus {
 195    type Params = ServerStatusParams;
 196    const METHOD: &'static str = "experimental/serverStatus";
 197}
 198
 199impl LanguageServer {
 200    /// Starts a language server process.
 201    pub fn new(
 202        stderr_capture: Arc<Mutex<Option<String>>>,
 203        server_id: LanguageServerId,
 204        binary: LanguageServerBinary,
 205        root_path: &Path,
 206        code_action_kinds: Option<Vec<CodeActionKind>>,
 207        cx: AsyncAppContext,
 208    ) -> Result<Self> {
 209        let working_dir = if root_path.is_dir() {
 210            root_path
 211        } else {
 212            root_path.parent().unwrap_or_else(|| Path::new("/"))
 213        };
 214
 215        log::info!(
 216            "starting language server. binary path: {:?}, working directory: {:?}, args: {:?}",
 217            binary.path,
 218            working_dir,
 219            &binary.arguments
 220        );
 221
 222        let mut command = process::Command::new(&binary.path);
 223        command
 224            .current_dir(working_dir)
 225            .args(binary.arguments)
 226            .envs(binary.env.unwrap_or_default())
 227            .stdin(Stdio::piped())
 228            .stdout(Stdio::piped())
 229            .stderr(Stdio::piped())
 230            .kill_on_drop(true);
 231        #[cfg(windows)]
 232        command.creation_flags(windows::Win32::System::Threading::CREATE_NO_WINDOW.0);
 233        let mut server = command.spawn()?;
 234
 235        let stdin = server.stdin.take().unwrap();
 236        let stdout = server.stdout.take().unwrap();
 237        let stderr = server.stderr.take().unwrap();
 238        let mut server = Self::new_internal(
 239            server_id,
 240            stdin,
 241            stdout,
 242            Some(stderr),
 243            stderr_capture,
 244            Some(server),
 245            root_path,
 246            code_action_kinds,
 247            cx,
 248            move |notification| {
 249                log::info!(
 250                    "Language server with id {} sent unhandled notification {}:\n{}",
 251                    server_id,
 252                    notification.method,
 253                    serde_json::to_string_pretty(
 254                        &notification
 255                            .params
 256                            .and_then(|params| Value::from_str(params.get()).ok())
 257                            .unwrap_or(Value::Null)
 258                    )
 259                    .unwrap(),
 260                );
 261            },
 262        );
 263
 264        if let Some(name) = binary.path.file_name() {
 265            server.name = name.to_string_lossy().into();
 266        }
 267
 268        Ok(server)
 269    }
 270
 271    #[allow(clippy::too_many_arguments)]
 272    fn new_internal<Stdin, Stdout, Stderr, F>(
 273        server_id: LanguageServerId,
 274        stdin: Stdin,
 275        stdout: Stdout,
 276        stderr: Option<Stderr>,
 277        stderr_capture: Arc<Mutex<Option<String>>>,
 278        server: Option<Child>,
 279        root_path: &Path,
 280        code_action_kinds: Option<Vec<CodeActionKind>>,
 281        cx: AsyncAppContext,
 282        on_unhandled_notification: F,
 283    ) -> Self
 284    where
 285        Stdin: AsyncWrite + Unpin + Send + 'static,
 286        Stdout: AsyncRead + Unpin + Send + 'static,
 287        Stderr: AsyncRead + Unpin + Send + 'static,
 288        F: FnMut(AnyNotification) + 'static + Send + Sync + Clone,
 289    {
 290        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 291        let (output_done_tx, output_done_rx) = barrier::channel();
 292        let notification_handlers =
 293            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 294        let response_handlers =
 295            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 296        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 297
 298        let stdout_input_task = cx.spawn({
 299            let on_unhandled_notification = on_unhandled_notification.clone();
 300            let notification_handlers = notification_handlers.clone();
 301            let response_handlers = response_handlers.clone();
 302            let io_handlers = io_handlers.clone();
 303            move |cx| {
 304                Self::handle_input(
 305                    stdout,
 306                    on_unhandled_notification,
 307                    notification_handlers,
 308                    response_handlers,
 309                    io_handlers,
 310                    cx,
 311                )
 312                .log_err()
 313            }
 314        });
 315        let stderr_input_task = stderr
 316            .map(|stderr| {
 317                let io_handlers = io_handlers.clone();
 318                let stderr_captures = stderr_capture.clone();
 319                cx.spawn(|_| Self::handle_stderr(stderr, io_handlers, stderr_captures).log_err())
 320            })
 321            .unwrap_or_else(|| Task::Ready(Some(None)));
 322        let input_task = cx.spawn(|_| async move {
 323            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 324            stdout.or(stderr)
 325        });
 326        let output_task = cx.background_executor().spawn({
 327            Self::handle_output(
 328                stdin,
 329                outbound_rx,
 330                output_done_tx,
 331                response_handlers.clone(),
 332                io_handlers.clone(),
 333            )
 334            .log_err()
 335        });
 336
 337        Self {
 338            server_id,
 339            notification_handlers,
 340            response_handlers,
 341            io_handlers,
 342            name: "".into(),
 343            capabilities: Default::default(),
 344            code_action_kinds,
 345            next_id: Default::default(),
 346            outbound_tx,
 347            executor: cx.background_executor().clone(),
 348            io_tasks: Mutex::new(Some((input_task, output_task))),
 349            output_done_rx: Mutex::new(Some(output_done_rx)),
 350            root_path: root_path.to_path_buf(),
 351            server: Arc::new(Mutex::new(server)),
 352        }
 353    }
 354
 355    /// List of code action kinds this language server reports being able to emit.
 356    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 357        self.code_action_kinds.clone()
 358    }
 359
 360    async fn handle_input<Stdout, F>(
 361        stdout: Stdout,
 362        mut on_unhandled_notification: F,
 363        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 364        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 365        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 366        cx: AsyncAppContext,
 367    ) -> anyhow::Result<()>
 368    where
 369        Stdout: AsyncRead + Unpin + Send + 'static,
 370        F: FnMut(AnyNotification) + 'static + Send,
 371    {
 372        let mut stdout = BufReader::new(stdout);
 373        let _clear_response_handlers = util::defer({
 374            let response_handlers = response_handlers.clone();
 375            move || {
 376                response_handlers.lock().take();
 377            }
 378        });
 379        let mut buffer = Vec::new();
 380        loop {
 381            buffer.clear();
 382
 383            read_headers(&mut stdout, &mut buffer).await?;
 384
 385            let headers = std::str::from_utf8(&buffer)?;
 386
 387            let message_len = headers
 388                .split('\n')
 389                .find(|line| line.starts_with(CONTENT_LEN_HEADER))
 390                .and_then(|line| line.strip_prefix(CONTENT_LEN_HEADER))
 391                .ok_or_else(|| anyhow!("invalid LSP message header {headers:?}"))?
 392                .trim_end()
 393                .parse()?;
 394
 395            buffer.resize(message_len, 0);
 396            stdout.read_exact(&mut buffer).await?;
 397
 398            if let Ok(message) = str::from_utf8(&buffer) {
 399                log::trace!("incoming message: {message}");
 400                for handler in io_handlers.lock().values_mut() {
 401                    handler(IoKind::StdOut, message);
 402                }
 403            }
 404
 405            if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
 406                if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
 407                    handler(
 408                        msg.id,
 409                        msg.params.map(|params| params.get()).unwrap_or("null"),
 410                        cx.clone(),
 411                    );
 412                } else {
 413                    on_unhandled_notification(msg);
 414                }
 415            } else if let Ok(AnyResponse {
 416                id, error, result, ..
 417            }) = serde_json::from_slice(&buffer)
 418            {
 419                if let Some(handler) = response_handlers
 420                    .lock()
 421                    .as_mut()
 422                    .and_then(|handlers| handlers.remove(&id))
 423                {
 424                    if let Some(error) = error {
 425                        handler(Err(error));
 426                    } else if let Some(result) = result {
 427                        handler(Ok(result.get().into()));
 428                    } else {
 429                        handler(Ok("null".into()));
 430                    }
 431                }
 432            } else {
 433                warn!(
 434                    "failed to deserialize LSP message:\n{}",
 435                    std::str::from_utf8(&buffer)?
 436                );
 437            }
 438
 439            // Don't starve the main thread when receiving lots of messages at once.
 440            smol::future::yield_now().await;
 441        }
 442    }
 443
 444    async fn handle_stderr<Stderr>(
 445        stderr: Stderr,
 446        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 447        stderr_capture: Arc<Mutex<Option<String>>>,
 448    ) -> anyhow::Result<()>
 449    where
 450        Stderr: AsyncRead + Unpin + Send + 'static,
 451    {
 452        let mut stderr = BufReader::new(stderr);
 453        let mut buffer = Vec::new();
 454
 455        loop {
 456            buffer.clear();
 457
 458            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 459            if bytes_read == 0 {
 460                return Ok(());
 461            }
 462
 463            if let Ok(message) = str::from_utf8(&buffer) {
 464                log::trace!("incoming stderr message:{message}");
 465                for handler in io_handlers.lock().values_mut() {
 466                    handler(IoKind::StdErr, message);
 467                }
 468
 469                if let Some(stderr) = stderr_capture.lock().as_mut() {
 470                    stderr.push_str(message);
 471                }
 472            }
 473
 474            // Don't starve the main thread when receiving lots of messages at once.
 475            smol::future::yield_now().await;
 476        }
 477    }
 478
 479    async fn handle_output<Stdin>(
 480        stdin: Stdin,
 481        outbound_rx: channel::Receiver<String>,
 482        output_done_tx: barrier::Sender,
 483        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 484        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 485    ) -> anyhow::Result<()>
 486    where
 487        Stdin: AsyncWrite + Unpin + Send + 'static,
 488    {
 489        let mut stdin = BufWriter::new(stdin);
 490        let _clear_response_handlers = util::defer({
 491            let response_handlers = response_handlers.clone();
 492            move || {
 493                response_handlers.lock().take();
 494            }
 495        });
 496        let mut content_len_buffer = Vec::new();
 497        while let Ok(message) = outbound_rx.recv().await {
 498            log::trace!("outgoing message:{}", message);
 499            for handler in io_handlers.lock().values_mut() {
 500                handler(IoKind::StdIn, &message);
 501            }
 502
 503            content_len_buffer.clear();
 504            write!(content_len_buffer, "{}", message.len()).unwrap();
 505            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 506            stdin.write_all(&content_len_buffer).await?;
 507            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 508            stdin.write_all(message.as_bytes()).await?;
 509            stdin.flush().await?;
 510        }
 511        drop(output_done_tx);
 512        Ok(())
 513    }
 514
 515    /// Initializes a language server by sending the `Initialize` request.
 516    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
 517    ///
 518    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
 519    pub fn initialize(
 520        mut self,
 521        options: Option<Value>,
 522        cx: &AppContext,
 523    ) -> Task<Result<Arc<Self>>> {
 524        let root_uri = Url::from_file_path(&self.root_path).unwrap();
 525        #[allow(deprecated)]
 526        let params = InitializeParams {
 527            process_id: None,
 528            root_path: None,
 529            root_uri: Some(root_uri.clone()),
 530            initialization_options: options,
 531            capabilities: ClientCapabilities {
 532                workspace: Some(WorkspaceClientCapabilities {
 533                    configuration: Some(true),
 534                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 535                        dynamic_registration: Some(true),
 536                        relative_pattern_support: Some(true),
 537                    }),
 538                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 539                        dynamic_registration: Some(true),
 540                    }),
 541                    workspace_folders: Some(true),
 542                    symbol: Some(WorkspaceSymbolClientCapabilities {
 543                        resolve_support: None,
 544                        ..WorkspaceSymbolClientCapabilities::default()
 545                    }),
 546                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 547                        refresh_support: Some(true),
 548                    }),
 549                    diagnostic: Some(DiagnosticWorkspaceClientCapabilities {
 550                        refresh_support: None,
 551                    }),
 552                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 553                        resource_operations: Some(vec![
 554                            ResourceOperationKind::Create,
 555                            ResourceOperationKind::Rename,
 556                            ResourceOperationKind::Delete,
 557                        ]),
 558                        document_changes: Some(true),
 559                        ..WorkspaceEditClientCapabilities::default()
 560                    }),
 561                    ..Default::default()
 562                }),
 563                text_document: Some(TextDocumentClientCapabilities {
 564                    definition: Some(GotoCapability {
 565                        link_support: Some(true),
 566                        dynamic_registration: None,
 567                    }),
 568                    code_action: Some(CodeActionClientCapabilities {
 569                        code_action_literal_support: Some(CodeActionLiteralSupport {
 570                            code_action_kind: CodeActionKindLiteralSupport {
 571                                value_set: vec![
 572                                    CodeActionKind::REFACTOR.as_str().into(),
 573                                    CodeActionKind::QUICKFIX.as_str().into(),
 574                                    CodeActionKind::SOURCE.as_str().into(),
 575                                ],
 576                            },
 577                        }),
 578                        data_support: Some(true),
 579                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 580                            properties: vec![
 581                                "kind".to_string(),
 582                                "diagnostics".to_string(),
 583                                "isPreferred".to_string(),
 584                                "disabled".to_string(),
 585                                "edit".to_string(),
 586                                "command".to_string(),
 587                            ],
 588                        }),
 589                        ..Default::default()
 590                    }),
 591                    completion: Some(CompletionClientCapabilities {
 592                        completion_item: Some(CompletionItemCapability {
 593                            snippet_support: Some(true),
 594                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 595                                properties: vec![
 596                                    "documentation".to_string(),
 597                                    "additionalTextEdits".to_string(),
 598                                ],
 599                            }),
 600                            insert_replace_support: Some(true),
 601                            ..Default::default()
 602                        }),
 603                        completion_list: Some(CompletionListCapability {
 604                            item_defaults: Some(vec![
 605                                "commitCharacters".to_owned(),
 606                                "editRange".to_owned(),
 607                                "insertTextMode".to_owned(),
 608                                "data".to_owned(),
 609                            ]),
 610                        }),
 611                        ..Default::default()
 612                    }),
 613                    rename: Some(RenameClientCapabilities {
 614                        prepare_support: Some(true),
 615                        ..Default::default()
 616                    }),
 617                    hover: Some(HoverClientCapabilities {
 618                        content_format: Some(vec![MarkupKind::Markdown]),
 619                        dynamic_registration: None,
 620                    }),
 621                    inlay_hint: Some(InlayHintClientCapabilities {
 622                        resolve_support: Some(InlayHintResolveClientCapabilities {
 623                            properties: vec![
 624                                "textEdits".to_string(),
 625                                "tooltip".to_string(),
 626                                "label.tooltip".to_string(),
 627                                "label.location".to_string(),
 628                                "label.command".to_string(),
 629                            ],
 630                        }),
 631                        dynamic_registration: Some(false),
 632                    }),
 633                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 634                        related_information: Some(true),
 635                        ..Default::default()
 636                    }),
 637                    formatting: Some(DynamicRegistrationClientCapabilities {
 638                        dynamic_registration: None,
 639                    }),
 640                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 641                        dynamic_registration: None,
 642                    }),
 643                    diagnostic: Some(DiagnosticClientCapabilities {
 644                        related_document_support: Some(true),
 645                        dynamic_registration: None,
 646                    }),
 647                    ..Default::default()
 648                }),
 649                experimental: Some(json!({
 650                    "serverStatusNotification": true,
 651                })),
 652                window: Some(WindowClientCapabilities {
 653                    work_done_progress: Some(true),
 654                    ..Default::default()
 655                }),
 656                general: None,
 657            },
 658            trace: None,
 659            workspace_folders: Some(vec![WorkspaceFolder {
 660                uri: root_uri,
 661                name: Default::default(),
 662            }]),
 663            client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
 664                ClientInfo {
 665                    name: release_channel.display_name().to_string(),
 666                    version: Some(release_channel::AppVersion::global(cx).to_string()),
 667                }
 668            }),
 669            locale: None,
 670        };
 671
 672        cx.spawn(|_| async move {
 673            let response = self.request::<request::Initialize>(params).await?;
 674            if let Some(info) = response.server_info {
 675                self.name = info.name.into();
 676            }
 677            self.capabilities = response.capabilities;
 678
 679            self.notify::<notification::Initialized>(InitializedParams {})?;
 680            Ok(Arc::new(self))
 681        })
 682    }
 683
 684    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
 685    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
 686        if let Some(tasks) = self.io_tasks.lock().take() {
 687            let response_handlers = self.response_handlers.clone();
 688            let next_id = AtomicI32::new(self.next_id.load(SeqCst));
 689            let outbound_tx = self.outbound_tx.clone();
 690            let executor = self.executor.clone();
 691            let mut output_done = self.output_done_rx.lock().take().unwrap();
 692            let shutdown_request = Self::request_internal::<request::Shutdown>(
 693                &next_id,
 694                &response_handlers,
 695                &outbound_tx,
 696                &executor,
 697                (),
 698            );
 699            let exit = Self::notify_internal::<notification::Exit>(&outbound_tx, ());
 700            outbound_tx.close();
 701
 702            let server = self.server.clone();
 703            let name = self.name.clone();
 704            let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
 705            Some(
 706                async move {
 707                    log::debug!("language server shutdown started");
 708
 709                    select! {
 710                        request_result = shutdown_request.fuse() => {
 711                            request_result?;
 712                        }
 713
 714                        _ = timer => {
 715                            log::info!("timeout waiting for language server {name} to shutdown");
 716                        },
 717                    }
 718
 719                    response_handlers.lock().take();
 720                    exit?;
 721                    output_done.recv().await;
 722                    server.lock().take().map(|mut child| child.kill());
 723                    log::debug!("language server shutdown finished");
 724
 725                    drop(tasks);
 726                    anyhow::Ok(())
 727                }
 728                .log_err(),
 729            )
 730        } else {
 731            None
 732        }
 733    }
 734
 735    /// Register a handler to handle incoming LSP notifications.
 736    ///
 737    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 738    #[must_use]
 739    pub fn on_notification<T, F>(&self, f: F) -> Subscription
 740    where
 741        T: notification::Notification,
 742        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
 743    {
 744        self.on_custom_notification(T::METHOD, f)
 745    }
 746
 747    /// Register a handler to handle incoming LSP requests.
 748    ///
 749    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 750    #[must_use]
 751    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
 752    where
 753        T: request::Request,
 754        T::Params: 'static + Send,
 755        F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
 756        Fut: 'static + Future<Output = Result<T::Result>>,
 757    {
 758        self.on_custom_request(T::METHOD, f)
 759    }
 760
 761    /// Registers a handler to inspect all language server process stdio.
 762    #[must_use]
 763    pub fn on_io<F>(&self, f: F) -> Subscription
 764    where
 765        F: 'static + Send + FnMut(IoKind, &str),
 766    {
 767        let id = self.next_id.fetch_add(1, SeqCst);
 768        self.io_handlers.lock().insert(id, Box::new(f));
 769        Subscription::Io {
 770            id,
 771            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
 772        }
 773    }
 774
 775    /// Removes a request handler registers via [`Self::on_request`].
 776    pub fn remove_request_handler<T: request::Request>(&self) {
 777        self.notification_handlers.lock().remove(T::METHOD);
 778    }
 779
 780    /// Removes a notification handler registers via [`Self::on_notification`].
 781    pub fn remove_notification_handler<T: notification::Notification>(&self) {
 782        self.notification_handlers.lock().remove(T::METHOD);
 783    }
 784
 785    /// Checks if a notification handler has been registered via [`Self::on_notification`].
 786    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
 787        self.notification_handlers.lock().contains_key(T::METHOD)
 788    }
 789
 790    #[must_use]
 791    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
 792    where
 793        F: 'static + FnMut(Params, AsyncAppContext) + Send,
 794        Params: DeserializeOwned,
 795    {
 796        let prev_handler = self.notification_handlers.lock().insert(
 797            method,
 798            Box::new(move |_, params, cx| {
 799                if let Some(params) = serde_json::from_str(params).log_err() {
 800                    f(params, cx);
 801                }
 802            }),
 803        );
 804        assert!(
 805            prev_handler.is_none(),
 806            "registered multiple handlers for the same LSP method"
 807        );
 808        Subscription::Notification {
 809            method,
 810            notification_handlers: Some(self.notification_handlers.clone()),
 811        }
 812    }
 813
 814    #[must_use]
 815    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
 816    where
 817        F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
 818        Fut: 'static + Future<Output = Result<Res>>,
 819        Params: DeserializeOwned + Send + 'static,
 820        Res: Serialize,
 821    {
 822        let outbound_tx = self.outbound_tx.clone();
 823        let prev_handler = self.notification_handlers.lock().insert(
 824            method,
 825            Box::new(move |id, params, cx| {
 826                if let Some(id) = id {
 827                    match serde_json::from_str(params) {
 828                        Ok(params) => {
 829                            let response = f(params, cx.clone());
 830                            cx.foreground_executor()
 831                                .spawn({
 832                                    let outbound_tx = outbound_tx.clone();
 833                                    async move {
 834                                        let response = match response.await {
 835                                            Ok(result) => Response {
 836                                                jsonrpc: JSON_RPC_VERSION,
 837                                                id,
 838                                                result: Some(result),
 839                                                error: None,
 840                                            },
 841                                            Err(error) => Response {
 842                                                jsonrpc: JSON_RPC_VERSION,
 843                                                id,
 844                                                result: None,
 845                                                error: Some(Error {
 846                                                    message: error.to_string(),
 847                                                }),
 848                                            },
 849                                        };
 850                                        if let Some(response) =
 851                                            serde_json::to_string(&response).log_err()
 852                                        {
 853                                            outbound_tx.try_send(response).ok();
 854                                        }
 855                                    }
 856                                })
 857                                .detach();
 858                        }
 859
 860                        Err(error) => {
 861                            log::error!(
 862                                "error deserializing {} request: {:?}, message: {:?}",
 863                                method,
 864                                error,
 865                                params
 866                            );
 867                            let response = AnyResponse {
 868                                jsonrpc: JSON_RPC_VERSION,
 869                                id,
 870                                result: None,
 871                                error: Some(Error {
 872                                    message: error.to_string(),
 873                                }),
 874                            };
 875                            if let Some(response) = serde_json::to_string(&response).log_err() {
 876                                outbound_tx.try_send(response).ok();
 877                            }
 878                        }
 879                    }
 880                }
 881            }),
 882        );
 883        assert!(
 884            prev_handler.is_none(),
 885            "registered multiple handlers for the same LSP method"
 886        );
 887        Subscription::Notification {
 888            method,
 889            notification_handlers: Some(self.notification_handlers.clone()),
 890        }
 891    }
 892
 893    /// Get the name of the running language server.
 894    pub fn name(&self) -> &str {
 895        &self.name
 896    }
 897
 898    /// Get the reported capabilities of the running language server.
 899    pub fn capabilities(&self) -> &ServerCapabilities {
 900        &self.capabilities
 901    }
 902
 903    /// Get the id of the running language server.
 904    pub fn server_id(&self) -> LanguageServerId {
 905        self.server_id
 906    }
 907
 908    /// Get the root path of the project the language server is running against.
 909    pub fn root_path(&self) -> &PathBuf {
 910        &self.root_path
 911    }
 912
 913    /// Sends a RPC request to the language server.
 914    ///
 915    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 916    pub fn request<T: request::Request>(
 917        &self,
 918        params: T::Params,
 919    ) -> impl Future<Output = Result<T::Result>>
 920    where
 921        T::Result: 'static + Send,
 922    {
 923        Self::request_internal::<T>(
 924            &self.next_id,
 925            &self.response_handlers,
 926            &self.outbound_tx,
 927            &self.executor,
 928            params,
 929        )
 930    }
 931
 932    fn request_internal<T: request::Request>(
 933        next_id: &AtomicI32,
 934        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
 935        outbound_tx: &channel::Sender<String>,
 936        executor: &BackgroundExecutor,
 937        params: T::Params,
 938    ) -> impl 'static + Future<Output = anyhow::Result<T::Result>>
 939    where
 940        T::Result: 'static + Send,
 941    {
 942        let id = next_id.fetch_add(1, SeqCst);
 943        let message = serde_json::to_string(&Request {
 944            jsonrpc: JSON_RPC_VERSION,
 945            id: RequestId::Int(id),
 946            method: T::METHOD,
 947            params,
 948        })
 949        .unwrap();
 950
 951        let (tx, rx) = oneshot::channel();
 952        let handle_response = response_handlers
 953            .lock()
 954            .as_mut()
 955            .ok_or_else(|| anyhow!("server shut down"))
 956            .map(|handlers| {
 957                let executor = executor.clone();
 958                handlers.insert(
 959                    RequestId::Int(id),
 960                    Box::new(move |result| {
 961                        executor
 962                            .spawn(async move {
 963                                let response = match result {
 964                                    Ok(response) => match serde_json::from_str(&response) {
 965                                        Ok(deserialized) => Ok(deserialized),
 966                                        Err(error) => {
 967                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
 968                                            Err(error).context("failed to deserialize response")
 969                                        }
 970                                    }
 971                                    Err(error) => Err(anyhow!("{}", error.message)),
 972                                };
 973                                _ = tx.send(response);
 974                            })
 975                            .detach();
 976                    }),
 977                );
 978            });
 979
 980        let send = outbound_tx
 981            .try_send(message)
 982            .context("failed to write to language server's stdin");
 983
 984        let outbound_tx = outbound_tx.downgrade();
 985        let mut timeout = executor.timer(LSP_REQUEST_TIMEOUT).fuse();
 986        let started = Instant::now();
 987        async move {
 988            handle_response?;
 989            send?;
 990
 991            let cancel_on_drop = util::defer(move || {
 992                if let Some(outbound_tx) = outbound_tx.upgrade() {
 993                    Self::notify_internal::<notification::Cancel>(
 994                        &outbound_tx,
 995                        CancelParams {
 996                            id: NumberOrString::Number(id),
 997                        },
 998                    )
 999                    .log_err();
1000                }
1001            });
1002
1003            let method = T::METHOD;
1004            select! {
1005                response = rx.fuse() => {
1006                    let elapsed = started.elapsed();
1007                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1008                    cancel_on_drop.abort();
1009                    response?
1010                }
1011
1012                _ = timeout => {
1013                    log::error!("Cancelled LSP request task for {method:?} id {id} which took over {LSP_REQUEST_TIMEOUT:?}");
1014                    anyhow::bail!("LSP request timeout");
1015                }
1016            }
1017        }
1018    }
1019
1020    /// Sends a RPC notification to the language server.
1021    ///
1022    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1023    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1024        Self::notify_internal::<T>(&self.outbound_tx, params)
1025    }
1026
1027    fn notify_internal<T: notification::Notification>(
1028        outbound_tx: &channel::Sender<String>,
1029        params: T::Params,
1030    ) -> Result<()> {
1031        let message = serde_json::to_string(&Notification {
1032            jsonrpc: JSON_RPC_VERSION,
1033            method: T::METHOD,
1034            params,
1035        })
1036        .unwrap();
1037        outbound_tx.try_send(message)?;
1038        Ok(())
1039    }
1040}
1041
1042impl Drop for LanguageServer {
1043    fn drop(&mut self) {
1044        if let Some(shutdown) = self.shutdown() {
1045            self.executor.spawn(shutdown).detach();
1046        }
1047    }
1048}
1049
1050impl Subscription {
1051    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1052    pub fn detach(&mut self) {
1053        match self {
1054            Subscription::Notification {
1055                notification_handlers,
1056                ..
1057            } => *notification_handlers = None,
1058            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1059        }
1060    }
1061}
1062
1063impl fmt::Display for LanguageServerId {
1064    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065        self.0.fmt(f)
1066    }
1067}
1068
1069impl fmt::Debug for LanguageServer {
1070    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1071        f.debug_struct("LanguageServer")
1072            .field("id", &self.server_id.0)
1073            .field("name", &self.name)
1074            .finish_non_exhaustive()
1075    }
1076}
1077
1078impl Drop for Subscription {
1079    fn drop(&mut self) {
1080        match self {
1081            Subscription::Notification {
1082                method,
1083                notification_handlers,
1084            } => {
1085                if let Some(handlers) = notification_handlers {
1086                    handlers.lock().remove(method);
1087                }
1088            }
1089            Subscription::Io { id, io_handlers } => {
1090                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1091                    io_handlers.lock().remove(id);
1092                }
1093            }
1094        }
1095    }
1096}
1097
1098/// Mock language server for use in tests.
1099#[cfg(any(test, feature = "test-support"))]
1100#[derive(Clone)]
1101pub struct FakeLanguageServer {
1102    pub binary: LanguageServerBinary,
1103    pub server: Arc<LanguageServer>,
1104    notifications_rx: channel::Receiver<(String, String)>,
1105}
1106
1107#[cfg(any(test, feature = "test-support"))]
1108impl FakeLanguageServer {
1109    /// Construct a fake language server.
1110    pub fn new(
1111        server_id: LanguageServerId,
1112        binary: LanguageServerBinary,
1113        name: String,
1114        capabilities: ServerCapabilities,
1115        cx: AsyncAppContext,
1116    ) -> (LanguageServer, FakeLanguageServer) {
1117        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1118        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1119        let (notifications_tx, notifications_rx) = channel::unbounded();
1120
1121        let mut server = LanguageServer::new_internal(
1122            server_id,
1123            stdin_writer,
1124            stdout_reader,
1125            None::<async_pipe::PipeReader>,
1126            Arc::new(Mutex::new(None)),
1127            None,
1128            Path::new("/"),
1129            None,
1130            cx.clone(),
1131            |_| {},
1132        );
1133        server.name = name.as_str().into();
1134        let fake = FakeLanguageServer {
1135            binary,
1136            server: Arc::new({
1137                let mut server = LanguageServer::new_internal(
1138                    server_id,
1139                    stdout_writer,
1140                    stdin_reader,
1141                    None::<async_pipe::PipeReader>,
1142                    Arc::new(Mutex::new(None)),
1143                    None,
1144                    Path::new("/"),
1145                    None,
1146                    cx,
1147                    move |msg| {
1148                        notifications_tx
1149                            .try_send((
1150                                msg.method.to_string(),
1151                                msg.params
1152                                    .map(|raw_value| raw_value.get())
1153                                    .unwrap_or("null")
1154                                    .to_string(),
1155                            ))
1156                            .ok();
1157                    },
1158                );
1159                server.name = name.as_str().into();
1160                server
1161            }),
1162            notifications_rx,
1163        };
1164        fake.handle_request::<request::Initialize, _, _>({
1165            let capabilities = capabilities;
1166            move |_, _| {
1167                let capabilities = capabilities.clone();
1168                let name = name.clone();
1169                async move {
1170                    Ok(InitializeResult {
1171                        capabilities,
1172                        server_info: Some(ServerInfo {
1173                            name,
1174                            ..Default::default()
1175                        }),
1176                    })
1177                }
1178            }
1179        });
1180
1181        (server, fake)
1182    }
1183}
1184
1185#[cfg(any(test, feature = "test-support"))]
1186impl LanguageServer {
1187    pub fn full_capabilities() -> ServerCapabilities {
1188        ServerCapabilities {
1189            document_highlight_provider: Some(OneOf::Left(true)),
1190            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1191            document_formatting_provider: Some(OneOf::Left(true)),
1192            document_range_formatting_provider: Some(OneOf::Left(true)),
1193            definition_provider: Some(OneOf::Left(true)),
1194            implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1195            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1196            ..Default::default()
1197        }
1198    }
1199}
1200
1201#[cfg(any(test, feature = "test-support"))]
1202impl FakeLanguageServer {
1203    /// See [`LanguageServer::notify`].
1204    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1205        self.server.notify::<T>(params).ok();
1206    }
1207
1208    /// See [`LanguageServer::request`].
1209    pub async fn request<T>(&self, params: T::Params) -> Result<T::Result>
1210    where
1211        T: request::Request,
1212        T::Result: 'static + Send,
1213    {
1214        self.server.executor.start_waiting();
1215        self.server.request::<T>(params).await
1216    }
1217
1218    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1219    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1220        self.server.executor.start_waiting();
1221        self.try_receive_notification::<T>().await.unwrap()
1222    }
1223
1224    /// Consumes the notification channel until it finds a notification for the specified type.
1225    pub async fn try_receive_notification<T: notification::Notification>(
1226        &mut self,
1227    ) -> Option<T::Params> {
1228        use futures::StreamExt as _;
1229
1230        loop {
1231            let (method, params) = self.notifications_rx.next().await?;
1232            if method == T::METHOD {
1233                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1234            } else {
1235                log::info!("skipping message in fake language server {:?}", params);
1236            }
1237        }
1238    }
1239
1240    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1241    pub fn handle_request<T, F, Fut>(
1242        &self,
1243        mut handler: F,
1244    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1245    where
1246        T: 'static + request::Request,
1247        T::Params: 'static + Send,
1248        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
1249        Fut: 'static + Send + Future<Output = Result<T::Result>>,
1250    {
1251        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1252        self.server.remove_request_handler::<T>();
1253        self.server
1254            .on_request::<T, _, _>(move |params, cx| {
1255                let result = handler(params, cx.clone());
1256                let responded_tx = responded_tx.clone();
1257                let executor = cx.background_executor().clone();
1258                async move {
1259                    executor.simulate_random_delay().await;
1260                    let result = result.await;
1261                    responded_tx.unbounded_send(()).ok();
1262                    result
1263                }
1264            })
1265            .detach();
1266        responded_rx
1267    }
1268
1269    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1270    pub fn handle_notification<T, F>(
1271        &self,
1272        mut handler: F,
1273    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1274    where
1275        T: 'static + notification::Notification,
1276        T::Params: 'static + Send,
1277        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
1278    {
1279        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1280        self.server.remove_notification_handler::<T>();
1281        self.server
1282            .on_notification::<T, _>(move |params, cx| {
1283                handler(params, cx.clone());
1284                handled_tx.unbounded_send(()).ok();
1285            })
1286            .detach();
1287        handled_rx
1288    }
1289
1290    /// Removes any existing handler for specified notification type.
1291    pub fn remove_request_handler<T>(&mut self)
1292    where
1293        T: 'static + request::Request,
1294    {
1295        self.server.remove_request_handler::<T>();
1296    }
1297
1298    /// Simulate that the server has started work and notifies about its progress with the specified token.
1299    pub async fn start_progress(&self, token: impl Into<String>) {
1300        let token = token.into();
1301        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1302            token: NumberOrString::String(token.clone()),
1303        })
1304        .await
1305        .unwrap();
1306        self.notify::<notification::Progress>(ProgressParams {
1307            token: NumberOrString::String(token),
1308            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(Default::default())),
1309        });
1310    }
1311
1312    /// Simulate that the server has completed work and notifies about that with the specified token.
1313    pub fn end_progress(&self, token: impl Into<String>) {
1314        self.notify::<notification::Progress>(ProgressParams {
1315            token: NumberOrString::String(token.into()),
1316            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1317        });
1318    }
1319}
1320
1321pub(self) async fn read_headers<Stdout>(
1322    reader: &mut BufReader<Stdout>,
1323    buffer: &mut Vec<u8>,
1324) -> Result<()>
1325where
1326    Stdout: AsyncRead + Unpin + Send + 'static,
1327{
1328    loop {
1329        if buffer.len() >= HEADER_DELIMITER.len()
1330            && buffer[(buffer.len() - HEADER_DELIMITER.len())..] == HEADER_DELIMITER[..]
1331        {
1332            return Ok(());
1333        }
1334
1335        if reader.read_until(b'\n', buffer).await? == 0 {
1336            return Err(anyhow!("cannot read LSP message headers"));
1337        }
1338    }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343    use super::*;
1344    use gpui::TestAppContext;
1345
1346    #[ctor::ctor]
1347    fn init_logger() {
1348        if std::env::var("RUST_LOG").is_ok() {
1349            env_logger::init();
1350        }
1351    }
1352
1353    #[gpui::test]
1354    async fn test_fake(cx: &mut TestAppContext) {
1355        cx.update(|cx| {
1356            release_channel::init("0.0.0", cx);
1357        });
1358        let (server, mut fake) = FakeLanguageServer::new(
1359            LanguageServerId(0),
1360            LanguageServerBinary {
1361                path: "path/to/language-server".into(),
1362                arguments: vec![],
1363                env: None,
1364            },
1365            "the-lsp".to_string(),
1366            Default::default(),
1367            cx.to_async(),
1368        );
1369
1370        let (message_tx, message_rx) = channel::unbounded();
1371        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1372        server
1373            .on_notification::<notification::ShowMessage, _>(move |params, _| {
1374                message_tx.try_send(params).unwrap()
1375            })
1376            .detach();
1377        server
1378            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1379                diagnostics_tx.try_send(params).unwrap()
1380            })
1381            .detach();
1382
1383        let server = cx.update(|cx| server.initialize(None, cx)).await.unwrap();
1384        server
1385            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1386                text_document: TextDocumentItem::new(
1387                    Url::from_str("file://a/b").unwrap(),
1388                    "rust".to_string(),
1389                    0,
1390                    "".to_string(),
1391                ),
1392            })
1393            .unwrap();
1394        assert_eq!(
1395            fake.receive_notification::<notification::DidOpenTextDocument>()
1396                .await
1397                .text_document
1398                .uri
1399                .as_str(),
1400            "file://a/b"
1401        );
1402
1403        fake.notify::<notification::ShowMessage>(ShowMessageParams {
1404            typ: MessageType::ERROR,
1405            message: "ok".to_string(),
1406        });
1407        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1408            uri: Url::from_str("file://b/c").unwrap(),
1409            version: Some(5),
1410            diagnostics: vec![],
1411        });
1412        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1413        assert_eq!(
1414            diagnostics_rx.recv().await.unwrap().uri.as_str(),
1415            "file://b/c"
1416        );
1417
1418        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1419
1420        drop(server);
1421        fake.receive_notification::<notification::Exit>().await;
1422    }
1423
1424    #[gpui::test]
1425    async fn test_read_headers() {
1426        let mut buf = Vec::new();
1427        let mut reader = smol::io::BufReader::new(b"Content-Length: 123\r\n\r\n" as &[u8]);
1428        read_headers(&mut reader, &mut buf).await.unwrap();
1429        assert_eq!(buf, b"Content-Length: 123\r\n\r\n");
1430
1431        let mut buf = Vec::new();
1432        let mut reader = smol::io::BufReader::new(b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n{\"somecontent\":123}" as &[u8]);
1433        read_headers(&mut reader, &mut buf).await.unwrap();
1434        assert_eq!(
1435            buf,
1436            b"Content-Type: application/vscode-jsonrpc\r\nContent-Length: 1235\r\n\r\n"
1437        );
1438
1439        let mut buf = Vec::new();
1440        let mut reader = smol::io::BufReader::new(b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n{\"somecontent\":true}" as &[u8]);
1441        read_headers(&mut reader, &mut buf).await.unwrap();
1442        assert_eq!(
1443            buf,
1444            b"Content-Length: 1235\r\nContent-Type: application/vscode-jsonrpc\r\n\r\n"
1445        );
1446    }
1447
1448    #[gpui::test]
1449    fn test_deserialize_string_digit_id() {
1450        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1451        let notification = serde_json::from_str::<AnyNotification>(json)
1452            .expect("message with string id should be parsed");
1453        let expected_id = RequestId::Str("2".to_string());
1454        assert_eq!(notification.id, Some(expected_id));
1455    }
1456
1457    #[gpui::test]
1458    fn test_deserialize_string_id() {
1459        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1460        let notification = serde_json::from_str::<AnyNotification>(json)
1461            .expect("message with string id should be parsed");
1462        let expected_id = RequestId::Str("anythingAtAll".to_string());
1463        assert_eq!(notification.id, Some(expected_id));
1464    }
1465
1466    #[gpui::test]
1467    fn test_deserialize_int_id() {
1468        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1469        let notification = serde_json::from_str::<AnyNotification>(json)
1470            .expect("message with string id should be parsed");
1471        let expected_id = RequestId::Int(2);
1472        assert_eq!(notification.id, Some(expected_id));
1473    }
1474}