lsp.rs

   1mod input_handler;
   2
   3pub use lsp_types::request::*;
   4pub use lsp_types::*;
   5
   6use anyhow::{Context as _, Result, anyhow};
   7use collections::{BTreeMap, HashMap};
   8use futures::{
   9    AsyncRead, AsyncWrite, Future, FutureExt,
  10    channel::oneshot::{self, Canceled},
  11    future::{self, Either},
  12    io::BufWriter,
  13    select,
  14};
  15use gpui::{App, AppContext as _, AsyncApp, BackgroundExecutor, SharedString, Task};
  16use notification::DidChangeWorkspaceFolders;
  17use parking_lot::{Mutex, RwLock};
  18use postage::{barrier, prelude::Stream};
  19use schemars::JsonSchema;
  20use serde::{Deserialize, Serialize, de::DeserializeOwned};
  21use serde_json::{Value, json, value::RawValue};
  22use smol::{
  23    channel,
  24    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
  25};
  26use util::command::{Child, Stdio};
  27
  28use std::path::Path;
  29use std::{
  30    any::TypeId,
  31    collections::BTreeSet,
  32    ffi::{OsStr, OsString},
  33    fmt,
  34    io::Write,
  35    ops::DerefMut,
  36    path::PathBuf,
  37    pin::Pin,
  38    sync::{
  39        Arc, Weak,
  40        atomic::{AtomicI32, Ordering::SeqCst},
  41    },
  42    task::Poll,
  43    time::{Duration, Instant},
  44};
  45use util::{ConnectionResult, ResultExt, TryFutureExt, redact};
  46
  47const JSON_RPC_VERSION: &str = "2.0";
  48const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  49
  50/// The default amount of time to wait while initializing or fetching LSP servers, in seconds.
  51///
  52/// Should not be used (in favor of DEFAULT_LSP_REQUEST_TIMEOUT) and is exported solely for use inside ProjectSettings defaults.
  53pub const DEFAULT_LSP_REQUEST_TIMEOUT_SECS: u64 = 120;
  54/// A timeout representing the value of [DEFAULT_LSP_REQUEST_TIMEOUT_SECS].
  55///
  56/// Should **only be used** in tests and as a fallback when a corresponding config value cannot be obtained!
  57pub const DEFAULT_LSP_REQUEST_TIMEOUT: Duration =
  58    Duration::from_secs(DEFAULT_LSP_REQUEST_TIMEOUT_SECS);
  59
  60/// The shutdown timeout for LSP servers (including Prettier/Copilot).
  61const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  62
  63type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, &mut AsyncApp)>;
  64type PendingRespondTasks = Arc<Mutex<HashMap<RequestId, Task<()>>>>;
  65type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>) -> Task<()>>;
  66type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  67
  68/// Kind of language server stdio given to an IO handler.
  69#[derive(Debug, Clone, Copy)]
  70pub enum IoKind {
  71    StdOut,
  72    StdIn,
  73    StdErr,
  74}
  75
  76/// Represents a launchable language server. This can either be a standalone binary or the path
  77/// to a runtime with arguments to instruct it to launch the actual language server file.
  78#[derive(Clone, Serialize)]
  79pub struct LanguageServerBinary {
  80    pub path: PathBuf,
  81    pub arguments: Vec<OsString>,
  82    pub env: Option<HashMap<String, String>>,
  83}
  84
  85/// Configures the search (and installation) of language servers.
  86#[derive(Debug, Clone)]
  87pub struct LanguageServerBinaryOptions {
  88    /// Whether the adapter should look at the users system
  89    pub allow_path_lookup: bool,
  90    /// Whether the adapter should download its own version
  91    pub allow_binary_download: bool,
  92    /// Whether the adapter should download a pre-release version
  93    pub pre_release: bool,
  94}
  95
  96struct NotificationSerializer(Box<dyn FnOnce() -> String + Send + Sync>);
  97
  98/// A running language server process.
  99pub struct LanguageServer {
 100    server_id: LanguageServerId,
 101    next_id: AtomicI32,
 102    outbound_tx: channel::Sender<String>,
 103    notification_tx: channel::Sender<NotificationSerializer>,
 104    name: LanguageServerName,
 105    version: Option<SharedString>,
 106    process_name: Arc<str>,
 107    binary: LanguageServerBinary,
 108    capabilities: RwLock<ServerCapabilities>,
 109    /// Configuration sent to the server, stored for display in the language server logs
 110    /// buffer. This is represented as the message sent to the LSP in order to avoid cloning it (can
 111    /// be large in cases like sending schemas to the json server).
 112    configuration: Arc<DidChangeConfigurationParams>,
 113    code_action_kinds: Option<Vec<CodeActionKind>>,
 114    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 115    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 116    /// Tasks spawned by `on_custom_request` to compute responses. Tracked so that
 117    /// incoming `$/cancelRequest` notifications can cancel them by dropping the task.
 118    pending_respond_tasks: PendingRespondTasks,
 119    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 120    executor: BackgroundExecutor,
 121    #[allow(clippy::type_complexity)]
 122    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 123    output_done_rx: Mutex<Option<barrier::Receiver>>,
 124    server: Arc<Mutex<Option<Child>>>,
 125    workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 126    root_uri: Uri,
 127}
 128
 129#[derive(Clone, Debug, PartialEq, Eq, Hash)]
 130pub enum LanguageServerSelector {
 131    Id(LanguageServerId),
 132    Name(LanguageServerName),
 133}
 134
 135/// Identifies a running language server.
 136#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 137#[repr(transparent)]
 138pub struct LanguageServerId(pub usize);
 139
 140impl LanguageServerId {
 141    pub fn from_proto(id: u64) -> Self {
 142        Self(id as usize)
 143    }
 144
 145    pub fn to_proto(self) -> u64 {
 146        self.0 as u64
 147    }
 148}
 149
 150/// A name of a language server.
 151#[derive(
 152    Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize, JsonSchema,
 153)]
 154#[serde(transparent)]
 155pub struct LanguageServerName(pub SharedString);
 156
 157impl std::fmt::Display for LanguageServerName {
 158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 159        std::fmt::Display::fmt(&self.0, f)
 160    }
 161}
 162
 163impl AsRef<str> for LanguageServerName {
 164    fn as_ref(&self) -> &str {
 165        self.0.as_ref()
 166    }
 167}
 168
 169impl AsRef<OsStr> for LanguageServerName {
 170    fn as_ref(&self) -> &OsStr {
 171        self.0.as_ref().as_ref()
 172    }
 173}
 174
 175impl LanguageServerName {
 176    pub const fn new_static(s: &'static str) -> Self {
 177        Self(SharedString::new_static(s))
 178    }
 179
 180    pub fn from_proto(s: String) -> Self {
 181        Self(s.into())
 182    }
 183}
 184
 185impl<'a> From<&'a str> for LanguageServerName {
 186    fn from(str: &'a str) -> LanguageServerName {
 187        LanguageServerName(str.to_string().into())
 188    }
 189}
 190
 191impl PartialEq<str> for LanguageServerName {
 192    fn eq(&self, other: &str) -> bool {
 193        self.0 == other
 194    }
 195}
 196
 197/// Handle to a language server RPC activity subscription.
 198pub enum Subscription {
 199    Notification {
 200        method: &'static str,
 201        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
 202    },
 203    Io {
 204        id: i32,
 205        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
 206    },
 207}
 208
 209/// Language server protocol RPC request message ID.
 210///
 211/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 212#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 213#[serde(untagged)]
 214pub enum RequestId {
 215    Int(i32),
 216    Str(String),
 217}
 218
 219fn is_unit<T: 'static>(_: &T) -> bool {
 220    TypeId::of::<T>() == TypeId::of::<()>()
 221}
 222
 223/// Language server protocol RPC request message.
 224///
 225/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 226#[derive(Serialize, Deserialize)]
 227pub struct Request<'a, T>
 228where
 229    T: 'static,
 230{
 231    jsonrpc: &'static str,
 232    id: RequestId,
 233    method: &'a str,
 234    #[serde(default, skip_serializing_if = "is_unit")]
 235    params: T,
 236}
 237
 238/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 239#[derive(Serialize, Deserialize)]
 240struct AnyResponse<'a> {
 241    jsonrpc: &'a str,
 242    id: RequestId,
 243    #[serde(default)]
 244    error: Option<Error>,
 245    #[serde(borrow)]
 246    result: Option<&'a RawValue>,
 247}
 248
 249/// Language server protocol RPC request response message.
 250///
 251/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 252#[derive(Serialize)]
 253struct Response<T> {
 254    jsonrpc: &'static str,
 255    id: RequestId,
 256    #[serde(flatten)]
 257    value: LspResult<T>,
 258}
 259
 260#[derive(Serialize)]
 261#[serde(rename_all = "snake_case")]
 262enum LspResult<T> {
 263    #[serde(rename = "result")]
 264    Ok(Option<T>),
 265    Error(Option<Error>),
 266}
 267
 268/// Language server protocol RPC notification message.
 269///
 270/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 271#[derive(Serialize, Deserialize)]
 272struct Notification<'a, T>
 273where
 274    T: 'static,
 275{
 276    jsonrpc: &'static str,
 277    #[serde(borrow)]
 278    method: &'a str,
 279    #[serde(default, skip_serializing_if = "is_unit")]
 280    params: T,
 281}
 282
 283/// Language server RPC notification message before it is deserialized into a concrete type.
 284#[derive(Debug, Clone, Deserialize)]
 285struct NotificationOrRequest {
 286    #[serde(default)]
 287    id: Option<RequestId>,
 288    method: String,
 289    #[serde(default)]
 290    params: Option<Value>,
 291}
 292
 293#[derive(Debug, Serialize, Deserialize)]
 294struct Error {
 295    code: i64,
 296    message: String,
 297    #[serde(default)]
 298    data: Option<serde_json::Value>,
 299}
 300
 301pub trait LspRequestFuture<O>: Future<Output = ConnectionResult<O>> {
 302    fn id(&self) -> i32;
 303}
 304
 305struct LspRequest<F> {
 306    id: i32,
 307    request: F,
 308}
 309
 310impl<F> LspRequest<F> {
 311    pub fn new(id: i32, request: F) -> Self {
 312        Self { id, request }
 313    }
 314}
 315
 316impl<F: Future> Future for LspRequest<F> {
 317    type Output = F::Output;
 318
 319    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
 320        // SAFETY: This is standard pin projection, we're pinned so our fields must be pinned.
 321        let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().request) };
 322        inner.poll(cx)
 323    }
 324}
 325
 326impl<F, O> LspRequestFuture<O> for LspRequest<F>
 327where
 328    F: Future<Output = ConnectionResult<O>>,
 329{
 330    fn id(&self) -> i32 {
 331        self.id
 332    }
 333}
 334
 335/// Combined capabilities of the server and the adapter.
 336#[derive(Debug, Clone)]
 337pub struct AdapterServerCapabilities {
 338    // Reported capabilities by the server
 339    pub server_capabilities: ServerCapabilities,
 340    // List of code actions supported by the LspAdapter matching the server
 341    pub code_action_kinds: Option<Vec<CodeActionKind>>,
 342}
 343
 344// See the VSCode docs [1] and the LSP Spec [2]
 345//
 346// [1]: https://code.visualstudio.com/api/language-extensions/semantic-highlight-guide#standard-token-types-and-modifiers
 347// [2]: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#semanticTokenTypes
 348pub const SEMANTIC_TOKEN_TYPES: &[SemanticTokenType] = &[
 349    SemanticTokenType::NAMESPACE,
 350    SemanticTokenType::CLASS,
 351    SemanticTokenType::ENUM,
 352    SemanticTokenType::INTERFACE,
 353    SemanticTokenType::STRUCT,
 354    SemanticTokenType::TYPE_PARAMETER,
 355    SemanticTokenType::TYPE,
 356    SemanticTokenType::PARAMETER,
 357    SemanticTokenType::VARIABLE,
 358    SemanticTokenType::PROPERTY,
 359    SemanticTokenType::ENUM_MEMBER,
 360    SemanticTokenType::DECORATOR,
 361    SemanticTokenType::FUNCTION,
 362    SemanticTokenType::METHOD,
 363    SemanticTokenType::MACRO,
 364    SemanticTokenType::new("label"), // Not in the spec, but in the docs.
 365    SemanticTokenType::COMMENT,
 366    SemanticTokenType::STRING,
 367    SemanticTokenType::KEYWORD,
 368    SemanticTokenType::NUMBER,
 369    SemanticTokenType::REGEXP,
 370    SemanticTokenType::OPERATOR,
 371    SemanticTokenType::MODIFIER, // Only in the spec, not in the docs.
 372    // Language specific things below.
 373    // C#
 374    SemanticTokenType::EVENT,
 375    // Rust
 376    SemanticTokenType::new("lifetime"),
 377];
 378pub const SEMANTIC_TOKEN_MODIFIERS: &[SemanticTokenModifier] = &[
 379    SemanticTokenModifier::DECLARATION,
 380    SemanticTokenModifier::DEFINITION,
 381    SemanticTokenModifier::READONLY,
 382    SemanticTokenModifier::STATIC,
 383    SemanticTokenModifier::DEPRECATED,
 384    SemanticTokenModifier::ABSTRACT,
 385    SemanticTokenModifier::ASYNC,
 386    SemanticTokenModifier::MODIFICATION,
 387    SemanticTokenModifier::DOCUMENTATION,
 388    SemanticTokenModifier::DEFAULT_LIBRARY,
 389    // Language specific things below.
 390    // Rust
 391    SemanticTokenModifier::new("constant"),
 392];
 393
 394impl LanguageServer {
 395    /// Starts a language server process.
 396    /// A request_timeout of zero or Duration::MAX indicates an indefinite timeout.
 397    pub fn new(
 398        stderr_capture: Arc<Mutex<Option<String>>>,
 399        server_id: LanguageServerId,
 400        server_name: LanguageServerName,
 401        binary: LanguageServerBinary,
 402        root_path: &Path,
 403        code_action_kinds: Option<Vec<CodeActionKind>>,
 404        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 405        cx: &mut AsyncApp,
 406    ) -> Result<Self> {
 407        let working_dir = if root_path.is_dir() {
 408            root_path
 409        } else {
 410            root_path.parent().unwrap_or_else(|| Path::new("/"))
 411        };
 412        let root_uri = Uri::from_file_path(&working_dir)
 413            .map_err(|()| anyhow!("{working_dir:?} is not a valid URI"))?;
 414        log::info!(
 415            "starting language server process. binary path: \
 416            {:?}, working directory: {:?}, args: {:?}",
 417            binary.path,
 418            working_dir,
 419            &binary.arguments
 420        );
 421        let mut command = util::command::new_command(&binary.path);
 422        command
 423            .current_dir(working_dir)
 424            .args(&binary.arguments)
 425            .envs(binary.env.clone().unwrap_or_default())
 426            .stdin(Stdio::piped())
 427            .stdout(Stdio::piped())
 428            .stderr(Stdio::piped())
 429            .kill_on_drop(true);
 430
 431        let mut server = command
 432            .spawn()
 433            .with_context(|| format!("failed to spawn command {command:?}",))?;
 434
 435        let stdin = server.stdin.take().unwrap();
 436        let stdout = server.stdout.take().unwrap();
 437        let stderr = server.stderr.take().unwrap();
 438        let server = Self::new_internal(
 439            server_id,
 440            server_name,
 441            stdin,
 442            stdout,
 443            Some(stderr),
 444            stderr_capture,
 445            Some(server),
 446            code_action_kinds,
 447            binary,
 448            root_uri,
 449            workspace_folders,
 450            cx,
 451            move |notification| {
 452                log::info!(
 453                    "Language server with id {} sent unhandled notification {}:\n{}",
 454                    server_id,
 455                    notification.method,
 456                    serde_json::to_string_pretty(&notification.params).unwrap(),
 457                );
 458                false
 459            },
 460        );
 461
 462        Ok(server)
 463    }
 464
 465    fn new_internal<Stdin, Stdout, Stderr, F>(
 466        server_id: LanguageServerId,
 467        server_name: LanguageServerName,
 468        stdin: Stdin,
 469        stdout: Stdout,
 470        stderr: Option<Stderr>,
 471        stderr_capture: Arc<Mutex<Option<String>>>,
 472        server: Option<Child>,
 473        code_action_kinds: Option<Vec<CodeActionKind>>,
 474        binary: LanguageServerBinary,
 475        root_uri: Uri,
 476        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 477        cx: &mut AsyncApp,
 478        on_unhandled_notification: F,
 479    ) -> Self
 480    where
 481        Stdin: AsyncWrite + Unpin + Send + 'static,
 482        Stdout: AsyncRead + Unpin + Send + 'static,
 483        Stderr: AsyncRead + Unpin + Send + 'static,
 484        F: Fn(&NotificationOrRequest) -> bool + 'static + Send + Sync + Clone,
 485    {
 486        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 487        let (output_done_tx, output_done_rx) = barrier::channel();
 488        let notification_handlers =
 489            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 490        let response_handlers =
 491            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 492        let pending_respond_tasks = PendingRespondTasks::default();
 493        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 494
 495        let stdout_input_task = cx.spawn({
 496            let unhandled_notification_wrapper = {
 497                let response_channel = outbound_tx.clone();
 498                async move |msg: NotificationOrRequest| {
 499                    let did_handle = on_unhandled_notification(&msg);
 500                    if !did_handle && let Some(message_id) = msg.id {
 501                        let response = AnyResponse {
 502                            jsonrpc: JSON_RPC_VERSION,
 503                            id: message_id,
 504                            error: Some(Error {
 505                                code: -32601,
 506                                message: format!("Unrecognized method `{}`", msg.method),
 507                                data: None,
 508                            }),
 509                            result: None,
 510                        };
 511                        if let Ok(response) = serde_json::to_string(&response) {
 512                            response_channel.send(response).await.ok();
 513                        }
 514                    }
 515                }
 516            };
 517            let notification_handlers = notification_handlers.clone();
 518            let response_handlers = response_handlers.clone();
 519            let io_handlers = io_handlers.clone();
 520            let pending_respond_tasks = pending_respond_tasks.clone();
 521            async move |cx| {
 522                Self::handle_incoming_messages(
 523                    stdout,
 524                    unhandled_notification_wrapper,
 525                    notification_handlers,
 526                    response_handlers,
 527                    pending_respond_tasks,
 528                    io_handlers,
 529                    cx,
 530                )
 531                .log_err()
 532                .await
 533            }
 534        });
 535        let stderr_input_task = stderr
 536            .map(|stderr| {
 537                let io_handlers = io_handlers.clone();
 538                let stderr_captures = stderr_capture.clone();
 539                cx.background_spawn(async move {
 540                    Self::handle_stderr(stderr, io_handlers, stderr_captures)
 541                        .log_err()
 542                        .await
 543                })
 544            })
 545            .unwrap_or_else(|| Task::ready(None));
 546        let input_task = cx.background_spawn(async move {
 547            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 548            stdout.or(stderr)
 549        });
 550        let output_task = cx.background_spawn({
 551            Self::handle_outgoing_messages(
 552                stdin,
 553                outbound_rx,
 554                output_done_tx,
 555                response_handlers.clone(),
 556                io_handlers.clone(),
 557            )
 558            .log_err()
 559        });
 560
 561        let configuration = DidChangeConfigurationParams {
 562            settings: Value::Null,
 563        }
 564        .into();
 565
 566        let (notification_tx, notification_rx) = channel::unbounded::<NotificationSerializer>();
 567        cx.background_spawn({
 568            let outbound_tx = outbound_tx.clone();
 569            async move {
 570                while let Ok(serializer) = notification_rx.recv().await {
 571                    let serialized = (serializer.0)();
 572                    let Ok(_) = outbound_tx.send(serialized).await else {
 573                        return;
 574                    };
 575                }
 576                outbound_tx.close();
 577            }
 578        })
 579        .detach();
 580        Self {
 581            server_id,
 582            notification_handlers,
 583            notification_tx,
 584            response_handlers,
 585            pending_respond_tasks,
 586            io_handlers,
 587            name: server_name,
 588            version: None,
 589            process_name: binary
 590                .path
 591                .file_name()
 592                .map(|name| Arc::from(name.to_string_lossy()))
 593                .unwrap_or_default(),
 594            binary,
 595            capabilities: Default::default(),
 596            configuration,
 597            code_action_kinds,
 598            next_id: Default::default(),
 599            outbound_tx,
 600            executor: cx.background_executor().clone(),
 601            io_tasks: Mutex::new(Some((input_task, output_task))),
 602            output_done_rx: Mutex::new(Some(output_done_rx)),
 603            server: Arc::new(Mutex::new(server)),
 604            workspace_folders,
 605            root_uri,
 606        }
 607    }
 608
 609    /// List of code action kinds this language server reports being able to emit.
 610    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 611        self.code_action_kinds.clone()
 612    }
 613
 614    async fn handle_incoming_messages<Stdout>(
 615        stdout: Stdout,
 616        on_unhandled_notification: impl AsyncFn(NotificationOrRequest) + 'static + Send,
 617        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 618        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 619        pending_respond_tasks: PendingRespondTasks,
 620        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 621        cx: &mut AsyncApp,
 622    ) -> anyhow::Result<()>
 623    where
 624        Stdout: AsyncRead + Unpin + Send + 'static,
 625    {
 626        use smol::stream::StreamExt;
 627        let stdout = BufReader::new(stdout);
 628        let _clear_response_handlers = util::defer({
 629            let response_handlers = response_handlers.clone();
 630            move || {
 631                response_handlers.lock().take();
 632            }
 633        });
 634        let mut input_handler = input_handler::LspStdoutHandler::new(
 635            stdout,
 636            response_handlers,
 637            io_handlers,
 638            cx.background_executor().clone(),
 639        );
 640
 641        while let Some(msg) = input_handler.incoming_messages.next().await {
 642            if msg.method == <notification::Cancel as notification::Notification>::METHOD {
 643                if let Some(params) = msg.params {
 644                    if let Ok(cancel_params) = serde_json::from_value::<CancelParams>(params) {
 645                        let id = match cancel_params.id {
 646                            NumberOrString::Number(id) => RequestId::Int(id),
 647                            NumberOrString::String(id) => RequestId::Str(id),
 648                        };
 649                        pending_respond_tasks.lock().remove(&id);
 650                    }
 651                }
 652                continue;
 653            }
 654
 655            let unhandled_message = {
 656                let mut notification_handlers = notification_handlers.lock();
 657                if let Some(handler) = notification_handlers.get_mut(msg.method.as_str()) {
 658                    handler(msg.id, msg.params.unwrap_or(Value::Null), cx);
 659                    None
 660                } else {
 661                    Some(msg)
 662                }
 663            };
 664
 665            if let Some(msg) = unhandled_message {
 666                on_unhandled_notification(msg).await;
 667            }
 668
 669            // Don't starve the main thread when receiving lots of notifications at once.
 670            smol::future::yield_now().await;
 671        }
 672        input_handler.loop_handle.await
 673    }
 674
 675    async fn handle_stderr<Stderr>(
 676        stderr: Stderr,
 677        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 678        stderr_capture: Arc<Mutex<Option<String>>>,
 679    ) -> anyhow::Result<()>
 680    where
 681        Stderr: AsyncRead + Unpin + Send + 'static,
 682    {
 683        let mut stderr = BufReader::new(stderr);
 684        let mut buffer = Vec::new();
 685
 686        loop {
 687            buffer.clear();
 688
 689            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 690            if bytes_read == 0 {
 691                return Ok(());
 692            }
 693
 694            if let Ok(message) = std::str::from_utf8(&buffer) {
 695                log::trace!("incoming stderr message:{message}");
 696                for handler in io_handlers.lock().values_mut() {
 697                    handler(IoKind::StdErr, message);
 698                }
 699
 700                if let Some(stderr) = stderr_capture.lock().as_mut() {
 701                    stderr.push_str(message);
 702                }
 703            }
 704
 705            // Don't starve the main thread when receiving lots of messages at once.
 706            smol::future::yield_now().await;
 707        }
 708    }
 709
 710    async fn handle_outgoing_messages<Stdin>(
 711        stdin: Stdin,
 712        outbound_rx: channel::Receiver<String>,
 713        output_done_tx: barrier::Sender,
 714        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 715        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 716    ) -> anyhow::Result<()>
 717    where
 718        Stdin: AsyncWrite + Unpin + Send + 'static,
 719    {
 720        let mut stdin = BufWriter::new(stdin);
 721        let _clear_response_handlers = util::defer({
 722            let response_handlers = response_handlers.clone();
 723            move || {
 724                response_handlers.lock().take();
 725            }
 726        });
 727        let mut content_len_buffer = Vec::new();
 728        while let Ok(message) = outbound_rx.recv().await {
 729            log::trace!("outgoing message:{}", message);
 730            for handler in io_handlers.lock().values_mut() {
 731                handler(IoKind::StdIn, &message);
 732            }
 733
 734            content_len_buffer.clear();
 735            write!(content_len_buffer, "{}", message.len()).unwrap();
 736            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 737            stdin.write_all(&content_len_buffer).await?;
 738            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 739            stdin.write_all(message.as_bytes()).await?;
 740            stdin.flush().await?;
 741        }
 742        drop(output_done_tx);
 743        Ok(())
 744    }
 745
 746    pub fn default_initialize_params(
 747        &self,
 748        pull_diagnostics: bool,
 749        augments_syntax_tokens: bool,
 750        cx: &App,
 751    ) -> InitializeParams {
 752        let workspace_folders = self.workspace_folders.as_ref().map_or_else(
 753            || {
 754                vec![WorkspaceFolder {
 755                    name: Default::default(),
 756                    uri: self.root_uri.clone(),
 757                }]
 758            },
 759            |folders| {
 760                folders
 761                    .lock()
 762                    .iter()
 763                    .cloned()
 764                    .map(|uri| WorkspaceFolder {
 765                        name: Default::default(),
 766                        uri,
 767                    })
 768                    .collect()
 769            },
 770        );
 771
 772        #[allow(deprecated)]
 773        InitializeParams {
 774            process_id: Some(std::process::id()),
 775            root_path: Some(
 776                self.root_uri
 777                    .to_file_path()
 778                    .map(|path| path.to_string_lossy().into_owned())
 779                    .unwrap_or_else(|_| self.root_uri.path().to_string()),
 780            ),
 781            root_uri: Some(self.root_uri.clone()),
 782            initialization_options: None,
 783            capabilities: ClientCapabilities {
 784                general: Some(GeneralClientCapabilities {
 785                    position_encodings: Some(vec![PositionEncodingKind::UTF16]),
 786                    ..GeneralClientCapabilities::default()
 787                }),
 788                workspace: Some(WorkspaceClientCapabilities {
 789                    configuration: Some(true),
 790                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 791                        dynamic_registration: Some(true),
 792                        relative_pattern_support: Some(true),
 793                    }),
 794                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 795                        dynamic_registration: Some(true),
 796                    }),
 797                    workspace_folders: Some(true),
 798                    symbol: Some(WorkspaceSymbolClientCapabilities {
 799                        resolve_support: None,
 800                        dynamic_registration: Some(true),
 801                        ..WorkspaceSymbolClientCapabilities::default()
 802                    }),
 803                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 804                        refresh_support: Some(true),
 805                    }),
 806                    diagnostics: Some(DiagnosticWorkspaceClientCapabilities {
 807                        refresh_support: Some(true),
 808                    })
 809                    .filter(|_| pull_diagnostics),
 810                    code_lens: Some(CodeLensWorkspaceClientCapabilities {
 811                        refresh_support: Some(true),
 812                    }),
 813                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 814                        resource_operations: Some(vec![
 815                            ResourceOperationKind::Create,
 816                            ResourceOperationKind::Rename,
 817                            ResourceOperationKind::Delete,
 818                        ]),
 819                        document_changes: Some(true),
 820                        snippet_edit_support: Some(true),
 821                        ..WorkspaceEditClientCapabilities::default()
 822                    }),
 823                    file_operations: Some(WorkspaceFileOperationsClientCapabilities {
 824                        dynamic_registration: Some(true),
 825                        did_rename: Some(true),
 826                        will_rename: Some(true),
 827                        ..WorkspaceFileOperationsClientCapabilities::default()
 828                    }),
 829                    apply_edit: Some(true),
 830                    execute_command: Some(ExecuteCommandClientCapabilities {
 831                        dynamic_registration: Some(true),
 832                    }),
 833                    semantic_tokens: Some(SemanticTokensWorkspaceClientCapabilities {
 834                        refresh_support: Some(true),
 835                    }),
 836                    ..WorkspaceClientCapabilities::default()
 837                }),
 838                text_document: Some(TextDocumentClientCapabilities {
 839                    definition: Some(GotoCapability {
 840                        link_support: Some(true),
 841                        dynamic_registration: Some(true),
 842                    }),
 843                    code_action: Some(CodeActionClientCapabilities {
 844                        code_action_literal_support: Some(CodeActionLiteralSupport {
 845                            code_action_kind: CodeActionKindLiteralSupport {
 846                                value_set: vec![
 847                                    CodeActionKind::REFACTOR.as_str().into(),
 848                                    CodeActionKind::QUICKFIX.as_str().into(),
 849                                    CodeActionKind::SOURCE.as_str().into(),
 850                                ],
 851                            },
 852                        }),
 853                        data_support: Some(true),
 854                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 855                            properties: vec![
 856                                "kind".to_string(),
 857                                "diagnostics".to_string(),
 858                                "isPreferred".to_string(),
 859                                "disabled".to_string(),
 860                                "edit".to_string(),
 861                                "command".to_string(),
 862                            ],
 863                        }),
 864                        dynamic_registration: Some(true),
 865                        ..CodeActionClientCapabilities::default()
 866                    }),
 867                    completion: Some(CompletionClientCapabilities {
 868                        completion_item: Some(CompletionItemCapability {
 869                            snippet_support: Some(true),
 870                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 871                                properties: vec![
 872                                    "additionalTextEdits".to_string(),
 873                                    "command".to_string(),
 874                                    "detail".to_string(),
 875                                    "documentation".to_string(),
 876                                    // NB: Do not have this resolved, otherwise Zed becomes slow to complete things
 877                                    // "textEdit".to_string(),
 878                                ],
 879                            }),
 880                            deprecated_support: Some(true),
 881                            tag_support: Some(TagSupport {
 882                                value_set: vec![CompletionItemTag::DEPRECATED],
 883                            }),
 884                            insert_replace_support: Some(true),
 885                            label_details_support: Some(true),
 886                            insert_text_mode_support: Some(InsertTextModeSupport {
 887                                value_set: vec![
 888                                    InsertTextMode::AS_IS,
 889                                    InsertTextMode::ADJUST_INDENTATION,
 890                                ],
 891                            }),
 892                            documentation_format: Some(vec![
 893                                MarkupKind::Markdown,
 894                                MarkupKind::PlainText,
 895                            ]),
 896                            ..CompletionItemCapability::default()
 897                        }),
 898                        insert_text_mode: Some(InsertTextMode::ADJUST_INDENTATION),
 899                        completion_list: Some(CompletionListCapability {
 900                            item_defaults: Some(vec![
 901                                "commitCharacters".to_owned(),
 902                                "editRange".to_owned(),
 903                                "insertTextMode".to_owned(),
 904                                "insertTextFormat".to_owned(),
 905                                "data".to_owned(),
 906                            ]),
 907                        }),
 908                        context_support: Some(true),
 909                        dynamic_registration: Some(true),
 910                        ..CompletionClientCapabilities::default()
 911                    }),
 912                    rename: Some(RenameClientCapabilities {
 913                        prepare_support: Some(true),
 914                        prepare_support_default_behavior: Some(
 915                            PrepareSupportDefaultBehavior::IDENTIFIER,
 916                        ),
 917                        dynamic_registration: Some(true),
 918                        ..RenameClientCapabilities::default()
 919                    }),
 920                    hover: Some(HoverClientCapabilities {
 921                        content_format: Some(vec![MarkupKind::Markdown]),
 922                        dynamic_registration: Some(true),
 923                    }),
 924                    inlay_hint: Some(InlayHintClientCapabilities {
 925                        resolve_support: Some(InlayHintResolveClientCapabilities {
 926                            properties: vec![
 927                                "textEdits".to_string(),
 928                                "tooltip".to_string(),
 929                                "label.tooltip".to_string(),
 930                                "label.location".to_string(),
 931                                "label.command".to_string(),
 932                            ],
 933                        }),
 934                        dynamic_registration: Some(true),
 935                    }),
 936                    semantic_tokens: Some(SemanticTokensClientCapabilities {
 937                        dynamic_registration: Some(false),
 938                        requests: SemanticTokensClientCapabilitiesRequests {
 939                            range: None,
 940                            full: Some(SemanticTokensFullOptions::Delta { delta: Some(true) }),
 941                        },
 942                        token_types: SEMANTIC_TOKEN_TYPES.to_vec(),
 943                        token_modifiers: SEMANTIC_TOKEN_MODIFIERS.to_vec(),
 944                        formats: vec![TokenFormat::RELATIVE],
 945                        overlapping_token_support: Some(true),
 946                        multiline_token_support: Some(true),
 947                        server_cancel_support: Some(true),
 948                        augments_syntax_tokens: Some(augments_syntax_tokens),
 949                    }),
 950                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 951                        related_information: Some(true),
 952                        version_support: Some(true),
 953                        data_support: Some(true),
 954                        tag_support: Some(TagSupport {
 955                            value_set: vec![DiagnosticTag::UNNECESSARY, DiagnosticTag::DEPRECATED],
 956                        }),
 957                        code_description_support: Some(true),
 958                    }),
 959                    formatting: Some(DynamicRegistrationClientCapabilities {
 960                        dynamic_registration: Some(true),
 961                    }),
 962                    range_formatting: Some(DynamicRegistrationClientCapabilities {
 963                        dynamic_registration: Some(true),
 964                    }),
 965                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 966                        dynamic_registration: Some(true),
 967                    }),
 968                    signature_help: Some(SignatureHelpClientCapabilities {
 969                        signature_information: Some(SignatureInformationSettings {
 970                            documentation_format: Some(vec![
 971                                MarkupKind::Markdown,
 972                                MarkupKind::PlainText,
 973                            ]),
 974                            parameter_information: Some(ParameterInformationSettings {
 975                                label_offset_support: Some(true),
 976                            }),
 977                            active_parameter_support: Some(true),
 978                        }),
 979                        dynamic_registration: Some(true),
 980                        ..SignatureHelpClientCapabilities::default()
 981                    }),
 982                    synchronization: Some(TextDocumentSyncClientCapabilities {
 983                        did_save: Some(true),
 984                        dynamic_registration: Some(true),
 985                        ..TextDocumentSyncClientCapabilities::default()
 986                    }),
 987                    code_lens: Some(CodeLensClientCapabilities {
 988                        dynamic_registration: Some(true),
 989                    }),
 990                    document_symbol: Some(DocumentSymbolClientCapabilities {
 991                        hierarchical_document_symbol_support: Some(true),
 992                        dynamic_registration: Some(true),
 993                        ..DocumentSymbolClientCapabilities::default()
 994                    }),
 995                    diagnostic: Some(DiagnosticClientCapabilities {
 996                        dynamic_registration: Some(true),
 997                        related_document_support: Some(true),
 998                    })
 999                    .filter(|_| pull_diagnostics),
1000                    color_provider: Some(DocumentColorClientCapabilities {
1001                        dynamic_registration: Some(true),
1002                    }),
1003                    folding_range: Some(FoldingRangeClientCapabilities {
1004                        dynamic_registration: Some(true),
1005                        line_folding_only: Some(false),
1006                        range_limit: None,
1007                        folding_range: Some(FoldingRangeCapability {
1008                            collapsed_text: Some(true),
1009                        }),
1010                        folding_range_kind: Some(FoldingRangeKindCapability {
1011                            value_set: Some(vec![
1012                                FoldingRangeKind::Comment,
1013                                FoldingRangeKind::Region,
1014                                FoldingRangeKind::Imports,
1015                            ]),
1016                        }),
1017                    }),
1018                    ..TextDocumentClientCapabilities::default()
1019                }),
1020                experimental: Some(json!({
1021                    "serverStatusNotification": true,
1022                    "localDocs": true,
1023                })),
1024                window: Some(WindowClientCapabilities {
1025                    work_done_progress: Some(true),
1026                    show_message: Some(ShowMessageRequestClientCapabilities {
1027                        message_action_item: Some(MessageActionItemCapabilities {
1028                            additional_properties_support: Some(true),
1029                        }),
1030                    }),
1031                    ..WindowClientCapabilities::default()
1032                }),
1033            },
1034            trace: None,
1035            workspace_folders: Some(workspace_folders),
1036            client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
1037                ClientInfo {
1038                    name: release_channel.display_name().to_string(),
1039                    version: Some(release_channel::AppVersion::global(cx).to_string()),
1040                }
1041            }),
1042            locale: None,
1043            ..InitializeParams::default()
1044        }
1045    }
1046
1047    /// Initializes a language server by sending the `Initialize` request.
1048    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
1049    ///
1050    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
1051    pub fn initialize(
1052        mut self,
1053        params: InitializeParams,
1054        configuration: Arc<DidChangeConfigurationParams>,
1055        timeout: Duration,
1056        cx: &App,
1057    ) -> Task<Result<Arc<Self>>> {
1058        cx.background_spawn(async move {
1059            let response = self
1060                .request::<request::Initialize>(params, timeout)
1061                .await
1062                .into_response()
1063                .with_context(|| {
1064                    format!(
1065                        "initializing server {}, id {}",
1066                        self.name(),
1067                        self.server_id()
1068                    )
1069                })?;
1070            if let Some(info) = response.server_info {
1071                self.version = info.version.map(SharedString::from);
1072                self.process_name = info.name.into();
1073            }
1074            self.capabilities = RwLock::new(response.capabilities);
1075            self.configuration = configuration;
1076
1077            self.notify::<notification::Initialized>(InitializedParams {})?;
1078            Ok(Arc::new(self))
1079        })
1080    }
1081
1082    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
1083    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>> + use<>> {
1084        let tasks = self.io_tasks.lock().take()?;
1085
1086        let response_handlers = self.response_handlers.clone();
1087        let next_id = AtomicI32::new(self.next_id.load(SeqCst));
1088        let outbound_tx = self.outbound_tx.clone();
1089        let executor = self.executor.clone();
1090        let notification_serializers = self.notification_tx.clone();
1091        let mut output_done = self.output_done_rx.lock().take().unwrap();
1092        let shutdown_request = Self::request_internal::<request::Shutdown>(
1093            &next_id,
1094            &response_handlers,
1095            &outbound_tx,
1096            &notification_serializers,
1097            &executor,
1098            SERVER_SHUTDOWN_TIMEOUT,
1099            (),
1100        );
1101
1102        let server = self.server.clone();
1103        let name = self.name.clone();
1104        let server_id = self.server_id;
1105        let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
1106        Some(async move {
1107            log::debug!("language server shutdown started");
1108
1109            select! {
1110                request_result = shutdown_request.fuse() => {
1111                    match request_result {
1112                        ConnectionResult::Timeout => {
1113                            log::warn!("timeout waiting for language server {name} (id {server_id}) to shutdown");
1114                        },
1115                        ConnectionResult::ConnectionReset => {
1116                            log::warn!("language server {name} (id {server_id}) closed the shutdown request connection");
1117                        },
1118                        ConnectionResult::Result(Err(e)) => {
1119                            log::error!("Shutdown request failure, server {name} (id {server_id}): {e:#}");
1120                        },
1121                        ConnectionResult::Result(Ok(())) => {}
1122                    }
1123                }
1124
1125                _ = timer => {
1126                    log::info!("timeout waiting for language server {name} (id {server_id}) to shutdown");
1127                },
1128            }
1129
1130            response_handlers.lock().take();
1131            Self::notify_internal::<notification::Exit>(&notification_serializers, ()).ok();
1132            notification_serializers.close();
1133            output_done.recv().await;
1134            server.lock().take().map(|mut child| child.kill());
1135            drop(tasks);
1136            log::debug!("language server shutdown finished");
1137            Some(())
1138        })
1139    }
1140
1141    /// Register a handler to handle incoming LSP notifications.
1142    ///
1143    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1144    #[must_use]
1145    pub fn on_notification<T, F>(&self, f: F) -> Subscription
1146    where
1147        T: notification::Notification,
1148        F: 'static + Send + FnMut(T::Params, &mut AsyncApp),
1149    {
1150        self.on_custom_notification(T::METHOD, f)
1151    }
1152
1153    /// Register a handler to handle incoming LSP requests.
1154    ///
1155    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1156    #[must_use]
1157    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
1158    where
1159        T: request::Request,
1160        T::Params: 'static + Send,
1161        F: 'static + FnMut(T::Params, &mut AsyncApp) -> Fut + Send,
1162        Fut: 'static + Future<Output = Result<T::Result>>,
1163    {
1164        self.on_custom_request(T::METHOD, f)
1165    }
1166
1167    /// Registers a handler to inspect all language server process stdio.
1168    #[must_use]
1169    pub fn on_io<F>(&self, f: F) -> Subscription
1170    where
1171        F: 'static + Send + FnMut(IoKind, &str),
1172    {
1173        let id = self.next_id.fetch_add(1, SeqCst);
1174        self.io_handlers.lock().insert(id, Box::new(f));
1175        Subscription::Io {
1176            id,
1177            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
1178        }
1179    }
1180
1181    /// Removes a request handler registers via [`Self::on_request`].
1182    pub fn remove_request_handler<T: request::Request>(&self) {
1183        self.notification_handlers.lock().remove(T::METHOD);
1184    }
1185
1186    /// Removes a notification handler registers via [`Self::on_notification`].
1187    pub fn remove_notification_handler<T: notification::Notification>(&self) {
1188        self.notification_handlers.lock().remove(T::METHOD);
1189    }
1190
1191    /// Checks if a notification handler has been registered via [`Self::on_notification`].
1192    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
1193        self.notification_handlers.lock().contains_key(T::METHOD)
1194    }
1195
1196    #[must_use]
1197    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
1198    where
1199        F: 'static + FnMut(Params, &mut AsyncApp) + Send,
1200        Params: DeserializeOwned,
1201    {
1202        let prev_handler = self.notification_handlers.lock().insert(
1203            method,
1204            Box::new(move |_, params, cx| {
1205                if let Some(params) = serde_json::from_value(params).log_err() {
1206                    f(params, cx);
1207                }
1208            }),
1209        );
1210        assert!(
1211            prev_handler.is_none(),
1212            "registered multiple handlers for the same LSP method"
1213        );
1214        Subscription::Notification {
1215            method,
1216            notification_handlers: Some(self.notification_handlers.clone()),
1217        }
1218    }
1219
1220    #[must_use]
1221    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
1222    where
1223        F: 'static + FnMut(Params, &mut AsyncApp) -> Fut + Send,
1224        Fut: 'static + Future<Output = Result<Res>>,
1225        Params: DeserializeOwned + Send + 'static,
1226        Res: Serialize,
1227    {
1228        let outbound_tx = self.outbound_tx.clone();
1229        let pending_respond_tasks = self.pending_respond_tasks.clone();
1230        let prev_handler = self.notification_handlers.lock().insert(
1231            method,
1232            Box::new(move |id, params, cx| {
1233                if let Some(id) = id {
1234                    match serde_json::from_value(params) {
1235                        Ok(params) => {
1236                            let response = f(params, cx);
1237                            let task = cx.foreground_executor().spawn({
1238                                let outbound_tx = outbound_tx.clone();
1239                                let pending_respond_tasks = pending_respond_tasks.clone();
1240                                let id = id.clone();
1241                                async move {
1242                                    let response = match response.await {
1243                                        Ok(result) => Response {
1244                                            jsonrpc: JSON_RPC_VERSION,
1245                                            id: id.clone(),
1246                                            value: LspResult::Ok(Some(result)),
1247                                        },
1248                                        Err(error) => Response {
1249                                            jsonrpc: JSON_RPC_VERSION,
1250                                            id: id.clone(),
1251                                            value: LspResult::Error(Some(Error {
1252                                                code: lsp_types::error_codes::REQUEST_FAILED,
1253                                                message: error.to_string(),
1254                                                data: None,
1255                                            })),
1256                                        },
1257                                    };
1258                                    if let Some(response) =
1259                                        serde_json::to_string(&response).log_err()
1260                                    {
1261                                        outbound_tx.try_send(response).ok();
1262                                    }
1263                                    pending_respond_tasks.lock().remove(&id);
1264                                }
1265                            });
1266                            pending_respond_tasks.lock().insert(id, task);
1267                        }
1268
1269                        Err(error) => {
1270                            log::error!("error deserializing {} request: {:?}", method, error);
1271                            let response = AnyResponse {
1272                                jsonrpc: JSON_RPC_VERSION,
1273                                id,
1274                                result: None,
1275                                error: Some(Error {
1276                                    code: -32700, // Parse error
1277                                    message: error.to_string(),
1278                                    data: None,
1279                                }),
1280                            };
1281                            if let Some(response) = serde_json::to_string(&response).log_err() {
1282                                outbound_tx.try_send(response).ok();
1283                            }
1284                        }
1285                    }
1286                }
1287            }),
1288        );
1289        assert!(
1290            prev_handler.is_none(),
1291            "registered multiple handlers for the same LSP method"
1292        );
1293        Subscription::Notification {
1294            method,
1295            notification_handlers: Some(self.notification_handlers.clone()),
1296        }
1297    }
1298
1299    /// Get the name of the running language server.
1300    pub fn name(&self) -> LanguageServerName {
1301        self.name.clone()
1302    }
1303
1304    /// Get the version of the running language server.
1305    pub fn version(&self) -> Option<SharedString> {
1306        self.version.clone()
1307    }
1308
1309    /// Get the readable version of the running language server.
1310    pub fn readable_version(&self) -> Option<SharedString> {
1311        match self.name().as_ref() {
1312            "gopls" => {
1313                // Gopls returns a detailed JSON object as its version string; we must parse it to extract the semantic version.
1314                // Example: `{"GoVersion":"go1.26.0","Path":"golang.org/x/tools/gopls","Main":{},"Deps":[],"Settings":[],"Version":"v0.21.1"}`
1315                self.version
1316                    .as_ref()
1317                    .and_then(|obj| {
1318                        #[derive(Deserialize)]
1319                        struct GoplsVersion<'a> {
1320                            #[serde(rename = "Version")]
1321                            version: &'a str,
1322                        }
1323                        let parsed: GoplsVersion = serde_json::from_str(obj.as_str()).ok()?;
1324                        Some(parsed.version.trim_start_matches("v").to_owned().into())
1325                    })
1326                    .or_else(|| self.version.clone())
1327            }
1328            _ => self.version.clone(),
1329        }
1330    }
1331
1332    /// Get the process name of the running language server.
1333    pub fn process_name(&self) -> &str {
1334        &self.process_name
1335    }
1336
1337    /// Get the reported capabilities of the running language server.
1338    pub fn capabilities(&self) -> ServerCapabilities {
1339        self.capabilities.read().clone()
1340    }
1341
1342    /// Get the reported capabilities of the running language server and
1343    /// what we know on the client/adapter-side of its capabilities.
1344    pub fn adapter_server_capabilities(&self) -> AdapterServerCapabilities {
1345        AdapterServerCapabilities {
1346            server_capabilities: self.capabilities(),
1347            code_action_kinds: self.code_action_kinds(),
1348        }
1349    }
1350
1351    /// Update the capabilities of the running language server.
1352    pub fn update_capabilities(&self, update: impl FnOnce(&mut ServerCapabilities)) {
1353        update(self.capabilities.write().deref_mut());
1354    }
1355
1356    /// Get the individual configuration settings for the running language server.
1357    /// Does not include globally applied settings (which are stored in ProjectSettings::GlobalLspSettings).
1358    pub fn configuration(&self) -> &Value {
1359        &self.configuration.settings
1360    }
1361
1362    /// Get the ID of the running language server.
1363    pub fn server_id(&self) -> LanguageServerId {
1364        self.server_id
1365    }
1366
1367    /// Get the process ID of the running language server, if available.
1368    pub fn process_id(&self) -> Option<u32> {
1369        self.server.lock().as_ref().map(|child| child.id())
1370    }
1371
1372    /// Get the binary information of the running language server.
1373    pub fn binary(&self) -> &LanguageServerBinary {
1374        &self.binary
1375    }
1376
1377    /// Send a RPC request to the language server.
1378    ///
1379    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1380    pub fn request<T: request::Request>(
1381        &self,
1382        params: T::Params,
1383        request_timeout: Duration,
1384    ) -> impl LspRequestFuture<T::Result> + use<T>
1385    where
1386        T::Result: 'static + Send,
1387    {
1388        Self::request_internal::<T>(
1389            &self.next_id,
1390            &self.response_handlers,
1391            &self.outbound_tx,
1392            &self.notification_tx,
1393            &self.executor,
1394            request_timeout,
1395            params,
1396        )
1397    }
1398
1399    /// Send a RPC request to the language server with a custom timer.
1400    /// Once the attached future becomes ready, the request will time out with the provided output message.
1401    ///
1402    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1403    pub fn request_with_timer<T: request::Request, U: Future<Output = String>>(
1404        &self,
1405        params: T::Params,
1406        timer: U,
1407    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1408    where
1409        T::Result: 'static + Send,
1410    {
1411        Self::request_internal_with_timer::<T, U>(
1412            &self.next_id,
1413            &self.response_handlers,
1414            &self.outbound_tx,
1415            &self.notification_tx,
1416            &self.executor,
1417            timer,
1418            params,
1419        )
1420    }
1421
1422    fn request_internal_with_timer<T, U>(
1423        next_id: &AtomicI32,
1424        response_handlers: &Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
1425        outbound_tx: &channel::Sender<String>,
1426        notification_serializers: &channel::Sender<NotificationSerializer>,
1427        executor: &BackgroundExecutor,
1428        timer: U,
1429        params: T::Params,
1430    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1431    where
1432        T::Result: 'static + Send,
1433        T: request::Request,
1434        U: Future<Output = String>,
1435    {
1436        let id = next_id.fetch_add(1, SeqCst);
1437        let message = serde_json::to_string(&Request {
1438            jsonrpc: JSON_RPC_VERSION,
1439            id: RequestId::Int(id),
1440            method: T::METHOD,
1441            params,
1442        })
1443        .expect("LSP message should be serializable to JSON");
1444
1445        let (tx, rx) = oneshot::channel();
1446        let handle_response = response_handlers
1447            .lock()
1448            .as_mut()
1449            .context("server shut down")
1450            .map(|handlers| {
1451                let executor = executor.clone();
1452                handlers.insert(
1453                    RequestId::Int(id),
1454                    Box::new(move |result| {
1455                        executor
1456                            .spawn(async move {
1457                                let response = match result {
1458                                    Ok(response) => match serde_json::from_str(&response) {
1459                                        Ok(deserialized) => Ok(deserialized),
1460                                        Err(error) => {
1461                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
1462                                            Err(error).context("failed to deserialize response")
1463                                        }
1464                                    }
1465                                    Err(error) => Err(anyhow!("{}", error.message)),
1466                                };
1467                                tx.send(response).ok();
1468                            })
1469                    }),
1470                );
1471            });
1472
1473        let send = outbound_tx
1474            .try_send(message)
1475            .context("failed to write to language server's stdin");
1476
1477        let response_handlers = Arc::clone(response_handlers);
1478        let notification_serializers = notification_serializers.downgrade();
1479        let started = Instant::now();
1480        LspRequest::new(id, async move {
1481            if let Err(e) = handle_response {
1482                return ConnectionResult::Result(Err(e));
1483            }
1484            if let Err(e) = send {
1485                return ConnectionResult::Result(Err(e));
1486            }
1487
1488            let cancel_on_drop = util::defer(move || {
1489                if let Some(notification_serializers) = notification_serializers.upgrade() {
1490                    Self::notify_internal::<notification::Cancel>(
1491                        &notification_serializers,
1492                        CancelParams {
1493                            id: NumberOrString::Number(id),
1494                        },
1495                    )
1496                    .ok();
1497                }
1498            });
1499
1500            let method = T::METHOD;
1501            select! {
1502                response = rx.fuse() => {
1503                    let elapsed = started.elapsed();
1504                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1505                    cancel_on_drop.abort();
1506                    match response {
1507                        Ok(response_result) => ConnectionResult::Result(response_result),
1508                        Err(Canceled) => {
1509                            log::error!("Server reset connection for a request {method:?} id {id}");
1510                            ConnectionResult::ConnectionReset
1511                        },
1512                    }
1513                }
1514
1515                message = timer.fuse() => {
1516                    log::error!("Cancelled LSP request task for {method:?} id {id} {message}");
1517                    match response_handlers
1518                        .lock()
1519                        .as_mut()
1520                        .context("server shut down") {
1521                            Ok(handlers) => {
1522                                handlers.remove(&RequestId::Int(id));
1523                                ConnectionResult::Timeout
1524                            }
1525                            Err(e) => ConnectionResult::Result(Err(e)),
1526                        }
1527                }
1528            }
1529        })
1530    }
1531
1532    fn request_internal<T>(
1533        next_id: &AtomicI32,
1534        response_handlers: &Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
1535        outbound_tx: &channel::Sender<String>,
1536        notification_serializers: &channel::Sender<NotificationSerializer>,
1537        executor: &BackgroundExecutor,
1538        request_timeout: Duration,
1539        params: T::Params,
1540    ) -> impl LspRequestFuture<T::Result> + use<T>
1541    where
1542        T::Result: 'static + Send,
1543        T: request::Request,
1544    {
1545        Self::request_internal_with_timer::<T, _>(
1546            next_id,
1547            response_handlers,
1548            outbound_tx,
1549            notification_serializers,
1550            executor,
1551            Self::request_timeout_future(executor.clone(), request_timeout),
1552            params,
1553        )
1554    }
1555
1556    /// Internal function to return a Future from a configured timeout duration.
1557    /// If the duration is zero or `Duration::MAX`, the returned future never completes.
1558    fn request_timeout_future(
1559        executor: BackgroundExecutor,
1560        request_timeout: Duration,
1561    ) -> impl Future<Output = String> {
1562        if request_timeout == Duration::MAX || request_timeout == Duration::ZERO {
1563            return Either::Left(future::pending::<String>());
1564        }
1565
1566        Either::Right(
1567            executor
1568                .timer(request_timeout)
1569                .map(move |_| format!("which took over {request_timeout:?}")),
1570        )
1571    }
1572
1573    /// Obtain a request timer for the LSP.
1574    pub fn request_timer(&self, timeout: Duration) -> impl Future<Output = String> {
1575        Self::request_timeout_future(self.executor.clone(), timeout)
1576    }
1577
1578    /// Sends a RPC notification to the language server.
1579    ///
1580    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1581    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1582        let outbound = self.notification_tx.clone();
1583        Self::notify_internal::<T>(&outbound, params)
1584    }
1585
1586    fn notify_internal<T: notification::Notification>(
1587        outbound_tx: &channel::Sender<NotificationSerializer>,
1588        params: T::Params,
1589    ) -> Result<()> {
1590        let serializer = NotificationSerializer(Box::new(move || {
1591            serde_json::to_string(&Notification {
1592                jsonrpc: JSON_RPC_VERSION,
1593                method: T::METHOD,
1594                params,
1595            })
1596            .unwrap()
1597        }));
1598
1599        outbound_tx.send_blocking(serializer)?;
1600        Ok(())
1601    }
1602
1603    /// Add new workspace folder to the list.
1604    pub fn add_workspace_folder(&self, uri: Uri) {
1605        if self
1606            .capabilities()
1607            .workspace
1608            .and_then(|ws| {
1609                ws.workspace_folders.and_then(|folders| {
1610                    folders
1611                        .change_notifications
1612                        .map(|caps| matches!(caps, OneOf::Left(false)))
1613                })
1614            })
1615            .unwrap_or(true)
1616        {
1617            return;
1618        }
1619
1620        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1621            return;
1622        };
1623        let is_new_folder = workspace_folders.lock().insert(uri.clone());
1624        if is_new_folder {
1625            let params = DidChangeWorkspaceFoldersParams {
1626                event: WorkspaceFoldersChangeEvent {
1627                    added: vec![WorkspaceFolder {
1628                        uri,
1629                        name: String::default(),
1630                    }],
1631                    removed: vec![],
1632                },
1633            };
1634            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1635        }
1636    }
1637
1638    /// Remove existing workspace folder from the list.
1639    pub fn remove_workspace_folder(&self, uri: Uri) {
1640        if self
1641            .capabilities()
1642            .workspace
1643            .and_then(|ws| {
1644                ws.workspace_folders.and_then(|folders| {
1645                    folders
1646                        .change_notifications
1647                        .map(|caps| !matches!(caps, OneOf::Left(false)))
1648                })
1649            })
1650            .unwrap_or(true)
1651        {
1652            return;
1653        }
1654        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1655            return;
1656        };
1657        let was_removed = workspace_folders.lock().remove(&uri);
1658        if was_removed {
1659            let params = DidChangeWorkspaceFoldersParams {
1660                event: WorkspaceFoldersChangeEvent {
1661                    added: vec![],
1662                    removed: vec![WorkspaceFolder {
1663                        uri,
1664                        name: String::default(),
1665                    }],
1666                },
1667            };
1668            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1669        }
1670    }
1671    pub fn set_workspace_folders(&self, folders: BTreeSet<Uri>) {
1672        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1673            return;
1674        };
1675        let mut workspace_folders = workspace_folders.lock();
1676
1677        let old_workspace_folders = std::mem::take(&mut *workspace_folders);
1678        let added: Vec<_> = folders
1679            .difference(&old_workspace_folders)
1680            .map(|uri| WorkspaceFolder {
1681                uri: uri.clone(),
1682                name: String::default(),
1683            })
1684            .collect();
1685
1686        let removed: Vec<_> = old_workspace_folders
1687            .difference(&folders)
1688            .map(|uri| WorkspaceFolder {
1689                uri: uri.clone(),
1690                name: String::default(),
1691            })
1692            .collect();
1693        *workspace_folders = folders;
1694        let should_notify = !added.is_empty() || !removed.is_empty();
1695        if should_notify {
1696            drop(workspace_folders);
1697            let params = DidChangeWorkspaceFoldersParams {
1698                event: WorkspaceFoldersChangeEvent { added, removed },
1699            };
1700            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1701        }
1702    }
1703
1704    pub fn workspace_folders(&self) -> BTreeSet<Uri> {
1705        self.workspace_folders.as_ref().map_or_else(
1706            || BTreeSet::from_iter([self.root_uri.clone()]),
1707            |folders| folders.lock().clone(),
1708        )
1709    }
1710
1711    pub fn register_buffer(
1712        &self,
1713        uri: Uri,
1714        language_id: String,
1715        version: i32,
1716        initial_text: String,
1717    ) {
1718        self.notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1719            text_document: TextDocumentItem::new(uri, language_id, version, initial_text),
1720        })
1721        .ok();
1722    }
1723
1724    pub fn unregister_buffer(&self, uri: Uri) {
1725        self.notify::<notification::DidCloseTextDocument>(DidCloseTextDocumentParams {
1726            text_document: TextDocumentIdentifier::new(uri),
1727        })
1728        .ok();
1729    }
1730}
1731
1732impl Drop for LanguageServer {
1733    fn drop(&mut self) {
1734        if let Some(shutdown) = self.shutdown() {
1735            self.executor.spawn(shutdown).detach();
1736        }
1737    }
1738}
1739
1740impl Subscription {
1741    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1742    pub fn detach(&mut self) {
1743        match self {
1744            Subscription::Notification {
1745                notification_handlers,
1746                ..
1747            } => *notification_handlers = None,
1748            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1749        }
1750    }
1751}
1752
1753impl fmt::Display for LanguageServerId {
1754    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1755        self.0.fmt(f)
1756    }
1757}
1758
1759impl fmt::Debug for LanguageServer {
1760    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1761        f.debug_struct("LanguageServer")
1762            .field("id", &self.server_id.0)
1763            .field("name", &self.name)
1764            .finish_non_exhaustive()
1765    }
1766}
1767
1768impl fmt::Debug for LanguageServerBinary {
1769    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1770        let mut debug = f.debug_struct("LanguageServerBinary");
1771        debug.field("path", &self.path);
1772        debug.field("arguments", &self.arguments);
1773
1774        if let Some(env) = &self.env {
1775            let redacted_env: BTreeMap<String, String> = env
1776                .iter()
1777                .map(|(key, value)| {
1778                    let redacted_value = if redact::should_redact(key) {
1779                        "REDACTED".to_string()
1780                    } else {
1781                        value.clone()
1782                    };
1783                    (key.clone(), redacted_value)
1784                })
1785                .collect();
1786            debug.field("env", &Some(redacted_env));
1787        } else {
1788            debug.field("env", &self.env);
1789        }
1790
1791        debug.finish()
1792    }
1793}
1794
1795impl Drop for Subscription {
1796    fn drop(&mut self) {
1797        match self {
1798            Subscription::Notification {
1799                method,
1800                notification_handlers,
1801            } => {
1802                if let Some(handlers) = notification_handlers {
1803                    handlers.lock().remove(method);
1804                }
1805            }
1806            Subscription::Io { id, io_handlers } => {
1807                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1808                    io_handlers.lock().remove(id);
1809                }
1810            }
1811        }
1812    }
1813}
1814
1815/// Mock language server for use in tests.
1816#[cfg(any(test, feature = "test-support"))]
1817#[derive(Clone)]
1818pub struct FakeLanguageServer {
1819    pub binary: LanguageServerBinary,
1820    pub server: Arc<LanguageServer>,
1821    notifications_rx: channel::Receiver<(String, String)>,
1822}
1823
1824#[cfg(any(test, feature = "test-support"))]
1825impl FakeLanguageServer {
1826    /// Construct a fake language server.
1827    pub fn new(
1828        server_id: LanguageServerId,
1829        binary: LanguageServerBinary,
1830        name: String,
1831        capabilities: ServerCapabilities,
1832        cx: &mut AsyncApp,
1833    ) -> (LanguageServer, FakeLanguageServer) {
1834        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1835        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1836        let (notifications_tx, notifications_rx) = channel::unbounded();
1837
1838        let server_name = LanguageServerName(name.clone().into());
1839        let process_name = Arc::from(name.as_str());
1840        let root = Self::root_path();
1841        let workspace_folders: Arc<Mutex<BTreeSet<Uri>>> = Default::default();
1842        let mut server = LanguageServer::new_internal(
1843            server_id,
1844            server_name.clone(),
1845            stdin_writer,
1846            stdout_reader,
1847            None::<async_pipe::PipeReader>,
1848            Arc::new(Mutex::new(None)),
1849            None,
1850            None,
1851            binary.clone(),
1852            root,
1853            Some(workspace_folders.clone()),
1854            cx,
1855            |_| false,
1856        );
1857        server.process_name = process_name;
1858        let fake = FakeLanguageServer {
1859            binary: binary.clone(),
1860            server: Arc::new({
1861                let mut server = LanguageServer::new_internal(
1862                    server_id,
1863                    server_name,
1864                    stdout_writer,
1865                    stdin_reader,
1866                    None::<async_pipe::PipeReader>,
1867                    Arc::new(Mutex::new(None)),
1868                    None,
1869                    None,
1870                    binary,
1871                    Self::root_path(),
1872                    Some(workspace_folders),
1873                    cx,
1874                    move |msg| {
1875                        notifications_tx
1876                            .try_send((
1877                                msg.method.to_string(),
1878                                msg.params.as_ref().unwrap_or(&Value::Null).to_string(),
1879                            ))
1880                            .ok();
1881                        true
1882                    },
1883                );
1884                server.process_name = name.as_str().into();
1885                server
1886            }),
1887            notifications_rx,
1888        };
1889        fake.set_request_handler::<request::Initialize, _, _>({
1890            let capabilities = capabilities;
1891            move |_, _| {
1892                let capabilities = capabilities.clone();
1893                let name = name.clone();
1894                async move {
1895                    Ok(InitializeResult {
1896                        capabilities,
1897                        server_info: Some(ServerInfo {
1898                            name,
1899                            ..Default::default()
1900                        }),
1901                    })
1902                }
1903            }
1904        });
1905
1906        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1907
1908        (server, fake)
1909    }
1910    #[cfg(target_os = "windows")]
1911    fn root_path() -> Uri {
1912        Uri::from_file_path("C:/").unwrap()
1913    }
1914
1915    #[cfg(not(target_os = "windows"))]
1916    fn root_path() -> Uri {
1917        Uri::from_file_path("/").unwrap()
1918    }
1919}
1920
1921#[cfg(any(test, feature = "test-support"))]
1922impl LanguageServer {
1923    pub fn full_capabilities() -> ServerCapabilities {
1924        ServerCapabilities {
1925            document_highlight_provider: Some(OneOf::Left(true)),
1926            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1927            document_formatting_provider: Some(OneOf::Left(true)),
1928            document_range_formatting_provider: Some(OneOf::Left(true)),
1929            definition_provider: Some(OneOf::Left(true)),
1930            workspace_symbol_provider: Some(OneOf::Left(true)),
1931            implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1932            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1933            ..ServerCapabilities::default()
1934        }
1935    }
1936}
1937
1938#[cfg(any(test, feature = "test-support"))]
1939impl FakeLanguageServer {
1940    /// See [`LanguageServer::notify`].
1941    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1942        self.server.notify::<T>(params).ok();
1943    }
1944
1945    /// See [`LanguageServer::request`].
1946    pub async fn request<T>(
1947        &self,
1948        params: T::Params,
1949        timeout: Duration,
1950    ) -> ConnectionResult<T::Result>
1951    where
1952        T: request::Request,
1953        T::Result: 'static + Send,
1954    {
1955        self.server.request::<T>(params, timeout).await
1956    }
1957
1958    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1959    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1960        self.try_receive_notification::<T>().await.unwrap()
1961    }
1962
1963    /// Consumes the notification channel until it finds a notification for the specified type.
1964    pub async fn try_receive_notification<T: notification::Notification>(
1965        &mut self,
1966    ) -> Option<T::Params> {
1967        loop {
1968            let (method, params) = self.notifications_rx.recv().await.ok()?;
1969            if method == T::METHOD {
1970                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1971            } else {
1972                log::info!("skipping message in fake language server {:?}", params);
1973            }
1974        }
1975    }
1976
1977    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1978    pub fn set_request_handler<T, F, Fut>(
1979        &self,
1980        mut handler: F,
1981    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1982    where
1983        T: 'static + request::Request,
1984        T::Params: 'static + Send,
1985        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp) -> Fut,
1986        Fut: 'static + Future<Output = Result<T::Result>>,
1987    {
1988        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1989        self.server.remove_request_handler::<T>();
1990        self.server
1991            .on_request::<T, _, _>(move |params, cx| {
1992                let result = handler(params, cx.clone());
1993                let responded_tx = responded_tx.clone();
1994                let executor = cx.background_executor().clone();
1995                async move {
1996                    let _guard = gpui_util::defer({
1997                        let responded_tx = responded_tx.clone();
1998                        move || {
1999                            responded_tx.unbounded_send(()).ok();
2000                        }
2001                    });
2002                    executor.simulate_random_delay().await;
2003                    result.await
2004                }
2005            })
2006            .detach();
2007        responded_rx
2008    }
2009
2010    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
2011    pub fn handle_notification<T, F>(
2012        &self,
2013        mut handler: F,
2014    ) -> futures::channel::mpsc::UnboundedReceiver<()>
2015    where
2016        T: 'static + notification::Notification,
2017        T::Params: 'static + Send,
2018        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp),
2019    {
2020        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
2021        self.server.remove_notification_handler::<T>();
2022        self.server
2023            .on_notification::<T, _>(move |params, cx| {
2024                handler(params, cx.clone());
2025                handled_tx.unbounded_send(()).ok();
2026            })
2027            .detach();
2028        handled_rx
2029    }
2030
2031    /// Removes any existing handler for specified notification type.
2032    pub fn remove_request_handler<T>(&mut self)
2033    where
2034        T: 'static + request::Request,
2035    {
2036        self.server.remove_request_handler::<T>();
2037    }
2038
2039    /// Simulate that the server has started work and notifies about its progress with the specified token.
2040    pub async fn start_progress(&self, token: impl Into<String>) {
2041        self.start_progress_with(token, Default::default(), Default::default())
2042            .await
2043    }
2044
2045    pub async fn start_progress_with(
2046        &self,
2047        token: impl Into<String>,
2048        progress: WorkDoneProgressBegin,
2049        request_timeout: Duration,
2050    ) {
2051        let token = token.into();
2052        self.request::<request::WorkDoneProgressCreate>(
2053            WorkDoneProgressCreateParams {
2054                token: NumberOrString::String(token.clone()),
2055            },
2056            request_timeout,
2057        )
2058        .await
2059        .into_response()
2060        .unwrap();
2061        self.notify::<notification::Progress>(ProgressParams {
2062            token: NumberOrString::String(token),
2063            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(progress)),
2064        });
2065    }
2066
2067    /// Simulate that the server has completed work and notifies about that with the specified token.
2068    pub fn end_progress(&self, token: impl Into<String>) {
2069        self.notify::<notification::Progress>(ProgressParams {
2070            token: NumberOrString::String(token.into()),
2071            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
2072        });
2073    }
2074}
2075
2076#[cfg(test)]
2077mod tests {
2078    use super::*;
2079    use gpui::TestAppContext;
2080    use std::str::FromStr;
2081
2082    #[ctor::ctor]
2083    fn init_logger() {
2084        zlog::init_test();
2085    }
2086
2087    #[gpui::test]
2088    async fn test_fake(cx: &mut TestAppContext) {
2089        cx.update(|cx| {
2090            release_channel::init(semver::Version::new(0, 0, 0), cx);
2091        });
2092        let (server, mut fake) = FakeLanguageServer::new(
2093            LanguageServerId(0),
2094            LanguageServerBinary {
2095                path: "path/to/language-server".into(),
2096                arguments: vec![],
2097                env: None,
2098            },
2099            "the-lsp".to_string(),
2100            Default::default(),
2101            &mut cx.to_async(),
2102        );
2103
2104        let (message_tx, message_rx) = channel::unbounded();
2105        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
2106        server
2107            .on_notification::<notification::ShowMessage, _>(move |params, _| {
2108                message_tx.try_send(params).unwrap()
2109            })
2110            .detach();
2111        server
2112            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
2113                diagnostics_tx.try_send(params).unwrap()
2114            })
2115            .detach();
2116
2117        let server = cx
2118            .update(|cx| {
2119                let params = server.default_initialize_params(false, false, cx);
2120                let configuration = DidChangeConfigurationParams {
2121                    settings: Default::default(),
2122                };
2123                server.initialize(
2124                    params,
2125                    configuration.into(),
2126                    DEFAULT_LSP_REQUEST_TIMEOUT,
2127                    cx,
2128                )
2129            })
2130            .await
2131            .unwrap();
2132        server
2133            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
2134                text_document: TextDocumentItem::new(
2135                    Uri::from_str("file://a/b").unwrap(),
2136                    "rust".to_string(),
2137                    0,
2138                    "".to_string(),
2139                ),
2140            })
2141            .unwrap();
2142        assert_eq!(
2143            fake.receive_notification::<notification::DidOpenTextDocument>()
2144                .await
2145                .text_document
2146                .uri
2147                .as_str(),
2148            "file://a/b"
2149        );
2150
2151        fake.notify::<notification::ShowMessage>(ShowMessageParams {
2152            typ: MessageType::ERROR,
2153            message: "ok".to_string(),
2154        });
2155        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
2156            uri: Uri::from_str("file://b/c").unwrap(),
2157            version: Some(5),
2158            diagnostics: vec![],
2159        });
2160        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
2161        assert_eq!(
2162            diagnostics_rx.recv().await.unwrap().uri.as_str(),
2163            "file://b/c"
2164        );
2165
2166        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
2167
2168        drop(server);
2169        cx.run_until_parked();
2170        fake.receive_notification::<notification::Exit>().await;
2171    }
2172
2173    #[gpui::test]
2174    fn test_deserialize_string_digit_id() {
2175        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2176        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2177            .expect("message with string id should be parsed");
2178        let expected_id = RequestId::Str("2".to_string());
2179        assert_eq!(notification.id, Some(expected_id));
2180    }
2181
2182    #[gpui::test]
2183    fn test_deserialize_string_id() {
2184        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2185        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2186            .expect("message with string id should be parsed");
2187        let expected_id = RequestId::Str("anythingAtAll".to_string());
2188        assert_eq!(notification.id, Some(expected_id));
2189    }
2190
2191    #[gpui::test]
2192    fn test_deserialize_int_id() {
2193        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2194        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2195            .expect("message with string id should be parsed");
2196        let expected_id = RequestId::Int(2);
2197        assert_eq!(notification.id, Some(expected_id));
2198    }
2199
2200    #[test]
2201    fn test_serialize_has_no_nulls() {
2202        // Ensure we're not setting both result and error variants. (ticket #10595)
2203        let no_tag = Response::<u32> {
2204            jsonrpc: "",
2205            id: RequestId::Int(0),
2206            value: LspResult::Ok(None),
2207        };
2208        assert_eq!(
2209            serde_json::to_string(&no_tag).unwrap(),
2210            "{\"jsonrpc\":\"\",\"id\":0,\"result\":null}"
2211        );
2212        let no_tag = Response::<u32> {
2213            jsonrpc: "",
2214            id: RequestId::Int(0),
2215            value: LspResult::Error(None),
2216        };
2217        assert_eq!(
2218            serde_json::to_string(&no_tag).unwrap(),
2219            "{\"jsonrpc\":\"\",\"id\":0,\"error\":null}"
2220        );
2221    }
2222
2223    #[gpui::test]
2224    async fn test_initialize_params_has_root_path_and_root_uri(cx: &mut TestAppContext) {
2225        cx.update(|cx| {
2226            release_channel::init(semver::Version::new(0, 0, 0), cx);
2227        });
2228        let (server, _fake) = FakeLanguageServer::new(
2229            LanguageServerId(0),
2230            LanguageServerBinary {
2231                path: "path/to/language-server".into(),
2232                arguments: vec![],
2233                env: None,
2234            },
2235            "test-lsp".to_string(),
2236            Default::default(),
2237            &mut cx.to_async(),
2238        );
2239
2240        let params = cx.update(|cx| server.default_initialize_params(false, false, cx));
2241
2242        #[allow(deprecated)]
2243        let root_uri = params.root_uri.expect("root_uri should be set");
2244        #[allow(deprecated)]
2245        let root_path = params.root_path.expect("root_path should be set");
2246
2247        let expected_path = root_uri
2248            .to_file_path()
2249            .expect("root_uri should be a valid file path");
2250        assert_eq!(
2251            root_path,
2252            expected_path.to_string_lossy(),
2253            "root_path should be derived from root_uri"
2254        );
2255    }
2256}