lsp.rs

   1mod input_handler;
   2
   3pub use lsp_types::request::*;
   4pub use lsp_types::*;
   5
   6use anyhow::{Context as _, Result, anyhow};
   7use collections::{BTreeMap, HashMap};
   8use futures::{
   9    AsyncRead, AsyncWrite, Future, FutureExt,
  10    channel::oneshot::{self, Canceled},
  11    future::{self, Either},
  12    io::BufWriter,
  13    select,
  14};
  15use gpui::{App, AppContext as _, AsyncApp, BackgroundExecutor, SharedString, Task};
  16use notification::DidChangeWorkspaceFolders;
  17use parking_lot::{Mutex, RwLock};
  18use postage::{barrier, prelude::Stream};
  19use schemars::JsonSchema;
  20use serde::{Deserialize, Serialize, de::DeserializeOwned};
  21use serde_json::{Value, json, value::RawValue};
  22use smol::{
  23    channel,
  24    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
  25};
  26use util::command::{Child, Stdio};
  27
  28use std::path::Path;
  29use std::{
  30    any::TypeId,
  31    collections::BTreeSet,
  32    ffi::{OsStr, OsString},
  33    fmt,
  34    io::Write,
  35    ops::DerefMut,
  36    path::PathBuf,
  37    pin::Pin,
  38    sync::{
  39        Arc, Weak,
  40        atomic::{AtomicI32, Ordering::SeqCst},
  41    },
  42    task::Poll,
  43    time::{Duration, Instant},
  44};
  45use util::{ConnectionResult, ResultExt, TryFutureExt, redact};
  46
  47const JSON_RPC_VERSION: &str = "2.0";
  48const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  49
  50/// The default amount of time to wait while initializing or fetching LSP servers, in seconds.
  51///
  52/// Should not be used (in favor of DEFAULT_LSP_REQUEST_TIMEOUT) and is exported solely for use inside ProjectSettings defaults.
  53pub const DEFAULT_LSP_REQUEST_TIMEOUT_SECS: u64 = 120;
  54/// A timeout representing the value of [DEFAULT_LSP_REQUEST_TIMEOUT_SECS].
  55///
  56/// Should **only be used** in tests and as a fallback when a corresponding config value cannot be obtained!
  57pub const DEFAULT_LSP_REQUEST_TIMEOUT: Duration =
  58    Duration::from_secs(DEFAULT_LSP_REQUEST_TIMEOUT_SECS);
  59
  60/// The shutdown timeout for LSP servers (including Prettier/Copilot).
  61const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  62
  63type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, &mut AsyncApp)>;
  64type PendingRespondTasks = Arc<Mutex<HashMap<RequestId, Task<()>>>>;
  65type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>) -> Task<()>>;
  66type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  67
  68/// Kind of language server stdio given to an IO handler.
  69#[derive(Debug, Clone, Copy)]
  70pub enum IoKind {
  71    StdOut,
  72    StdIn,
  73    StdErr,
  74}
  75
  76/// Represents a launchable language server. This can either be a standalone binary or the path
  77/// to a runtime with arguments to instruct it to launch the actual language server file.
  78#[derive(Clone, Serialize)]
  79pub struct LanguageServerBinary {
  80    pub path: PathBuf,
  81    pub arguments: Vec<OsString>,
  82    pub env: Option<HashMap<String, String>>,
  83}
  84
  85/// Configures the search (and installation) of language servers.
  86#[derive(Debug, Clone)]
  87pub struct LanguageServerBinaryOptions {
  88    /// Whether the adapter should look at the users system
  89    pub allow_path_lookup: bool,
  90    /// Whether the adapter should download its own version
  91    pub allow_binary_download: bool,
  92    /// Whether the adapter should download a pre-release version
  93    pub pre_release: bool,
  94}
  95
  96struct NotificationSerializer(Box<dyn FnOnce() -> String + Send + Sync>);
  97
  98/// A running language server process.
  99pub struct LanguageServer {
 100    server_id: LanguageServerId,
 101    next_id: AtomicI32,
 102    outbound_tx: channel::Sender<String>,
 103    notification_tx: channel::Sender<NotificationSerializer>,
 104    name: LanguageServerName,
 105    version: Option<SharedString>,
 106    process_name: Arc<str>,
 107    binary: LanguageServerBinary,
 108    capabilities: RwLock<ServerCapabilities>,
 109    /// Configuration sent to the server, stored for display in the language server logs
 110    /// buffer. This is represented as the message sent to the LSP in order to avoid cloning it (can
 111    /// be large in cases like sending schemas to the json server).
 112    configuration: Arc<DidChangeConfigurationParams>,
 113    code_action_kinds: Option<Vec<CodeActionKind>>,
 114    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 115    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 116    /// Tasks spawned by `on_custom_request` to compute responses. Tracked so that
 117    /// incoming `$/cancelRequest` notifications can cancel them by dropping the task.
 118    pending_respond_tasks: PendingRespondTasks,
 119    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 120    executor: BackgroundExecutor,
 121    #[allow(clippy::type_complexity)]
 122    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 123    output_done_rx: Mutex<Option<barrier::Receiver>>,
 124    server: Arc<Mutex<Option<Child>>>,
 125    workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 126    root_uri: Uri,
 127}
 128
 129#[derive(Clone, Debug, PartialEq, Eq, Hash)]
 130pub enum LanguageServerSelector {
 131    Id(LanguageServerId),
 132    Name(LanguageServerName),
 133}
 134
 135/// Identifies a running language server.
 136#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 137#[repr(transparent)]
 138pub struct LanguageServerId(pub usize);
 139
 140impl LanguageServerId {
 141    pub fn from_proto(id: u64) -> Self {
 142        Self(id as usize)
 143    }
 144
 145    pub fn to_proto(self) -> u64 {
 146        self.0 as u64
 147    }
 148}
 149
 150/// A name of a language server.
 151#[derive(
 152    Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize, JsonSchema,
 153)]
 154#[serde(transparent)]
 155pub struct LanguageServerName(pub SharedString);
 156
 157impl std::fmt::Display for LanguageServerName {
 158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 159        std::fmt::Display::fmt(&self.0, f)
 160    }
 161}
 162
 163impl AsRef<str> for LanguageServerName {
 164    fn as_ref(&self) -> &str {
 165        self.0.as_ref()
 166    }
 167}
 168
 169impl AsRef<OsStr> for LanguageServerName {
 170    fn as_ref(&self) -> &OsStr {
 171        self.0.as_ref().as_ref()
 172    }
 173}
 174
 175impl LanguageServerName {
 176    pub const fn new_static(s: &'static str) -> Self {
 177        Self(SharedString::new_static(s))
 178    }
 179
 180    pub fn from_proto(s: String) -> Self {
 181        Self(s.into())
 182    }
 183}
 184
 185impl<'a> From<&'a str> for LanguageServerName {
 186    fn from(str: &'a str) -> LanguageServerName {
 187        LanguageServerName(str.to_string().into())
 188    }
 189}
 190
 191impl PartialEq<str> for LanguageServerName {
 192    fn eq(&self, other: &str) -> bool {
 193        self.0 == other
 194    }
 195}
 196
 197/// Handle to a language server RPC activity subscription.
 198pub enum Subscription {
 199    Notification {
 200        method: &'static str,
 201        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
 202    },
 203    Io {
 204        id: i32,
 205        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
 206    },
 207}
 208
 209/// Language server protocol RPC request message ID.
 210///
 211/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 212#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 213#[serde(untagged)]
 214pub enum RequestId {
 215    Int(i32),
 216    Str(String),
 217}
 218
 219fn is_unit<T: 'static>(_: &T) -> bool {
 220    TypeId::of::<T>() == TypeId::of::<()>()
 221}
 222
 223/// Language server protocol RPC request message.
 224///
 225/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 226#[derive(Serialize, Deserialize)]
 227pub struct Request<'a, T>
 228where
 229    T: 'static,
 230{
 231    jsonrpc: &'static str,
 232    id: RequestId,
 233    method: &'a str,
 234    #[serde(default, skip_serializing_if = "is_unit")]
 235    params: T,
 236}
 237
 238/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 239#[derive(Serialize, Deserialize)]
 240struct AnyResponse<'a> {
 241    jsonrpc: &'a str,
 242    id: RequestId,
 243    #[serde(default)]
 244    error: Option<Error>,
 245    #[serde(borrow)]
 246    result: Option<&'a RawValue>,
 247}
 248
 249/// Language server protocol RPC request response message.
 250///
 251/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 252#[derive(Serialize)]
 253struct Response<T> {
 254    jsonrpc: &'static str,
 255    id: RequestId,
 256    #[serde(flatten)]
 257    value: LspResult<T>,
 258}
 259
 260#[derive(Serialize)]
 261#[serde(rename_all = "snake_case")]
 262enum LspResult<T> {
 263    #[serde(rename = "result")]
 264    Ok(Option<T>),
 265    Error(Option<Error>),
 266}
 267
 268/// Language server protocol RPC notification message.
 269///
 270/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 271#[derive(Serialize, Deserialize)]
 272struct Notification<'a, T>
 273where
 274    T: 'static,
 275{
 276    jsonrpc: &'static str,
 277    #[serde(borrow)]
 278    method: &'a str,
 279    #[serde(default, skip_serializing_if = "is_unit")]
 280    params: T,
 281}
 282
 283/// Language server RPC notification message before it is deserialized into a concrete type.
 284#[derive(Debug, Clone, Deserialize)]
 285struct NotificationOrRequest {
 286    #[serde(default)]
 287    id: Option<RequestId>,
 288    method: String,
 289    #[serde(default)]
 290    params: Option<Value>,
 291}
 292
 293#[derive(Debug, Serialize, Deserialize)]
 294struct Error {
 295    code: i64,
 296    message: String,
 297    #[serde(default)]
 298    data: Option<serde_json::Value>,
 299}
 300
 301pub trait LspRequestFuture<O>: Future<Output = ConnectionResult<O>> {
 302    fn id(&self) -> i32;
 303}
 304
 305struct LspRequest<F> {
 306    id: i32,
 307    request: F,
 308}
 309
 310impl<F> LspRequest<F> {
 311    pub fn new(id: i32, request: F) -> Self {
 312        Self { id, request }
 313    }
 314}
 315
 316impl<F: Future> Future for LspRequest<F> {
 317    type Output = F::Output;
 318
 319    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
 320        // SAFETY: This is standard pin projection, we're pinned so our fields must be pinned.
 321        let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().request) };
 322        inner.poll(cx)
 323    }
 324}
 325
 326impl<F, O> LspRequestFuture<O> for LspRequest<F>
 327where
 328    F: Future<Output = ConnectionResult<O>>,
 329{
 330    fn id(&self) -> i32 {
 331        self.id
 332    }
 333}
 334
 335/// Combined capabilities of the server and the adapter.
 336#[derive(Debug, Clone)]
 337pub struct AdapterServerCapabilities {
 338    // Reported capabilities by the server
 339    pub server_capabilities: ServerCapabilities,
 340    // List of code actions supported by the LspAdapter matching the server
 341    pub code_action_kinds: Option<Vec<CodeActionKind>>,
 342}
 343
 344// See the VSCode docs [1] and the LSP Spec [2]
 345//
 346// [1]: https://code.visualstudio.com/api/language-extensions/semantic-highlight-guide#standard-token-types-and-modifiers
 347// [2]: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#semanticTokenTypes
 348pub const SEMANTIC_TOKEN_TYPES: &[SemanticTokenType] = &[
 349    SemanticTokenType::NAMESPACE,
 350    SemanticTokenType::CLASS,
 351    SemanticTokenType::ENUM,
 352    SemanticTokenType::INTERFACE,
 353    SemanticTokenType::STRUCT,
 354    SemanticTokenType::TYPE_PARAMETER,
 355    SemanticTokenType::TYPE,
 356    SemanticTokenType::PARAMETER,
 357    SemanticTokenType::VARIABLE,
 358    SemanticTokenType::PROPERTY,
 359    SemanticTokenType::ENUM_MEMBER,
 360    SemanticTokenType::DECORATOR,
 361    SemanticTokenType::FUNCTION,
 362    SemanticTokenType::METHOD,
 363    SemanticTokenType::MACRO,
 364    SemanticTokenType::new("label"), // Not in the spec, but in the docs.
 365    SemanticTokenType::COMMENT,
 366    SemanticTokenType::STRING,
 367    SemanticTokenType::KEYWORD,
 368    SemanticTokenType::NUMBER,
 369    SemanticTokenType::REGEXP,
 370    SemanticTokenType::OPERATOR,
 371    SemanticTokenType::MODIFIER, // Only in the spec, not in the docs.
 372    // Language specific things below.
 373    // C#
 374    SemanticTokenType::EVENT,
 375    // Rust
 376    SemanticTokenType::new("lifetime"),
 377];
 378pub const SEMANTIC_TOKEN_MODIFIERS: &[SemanticTokenModifier] = &[
 379    SemanticTokenModifier::DECLARATION,
 380    SemanticTokenModifier::DEFINITION,
 381    SemanticTokenModifier::READONLY,
 382    SemanticTokenModifier::STATIC,
 383    SemanticTokenModifier::DEPRECATED,
 384    SemanticTokenModifier::ABSTRACT,
 385    SemanticTokenModifier::ASYNC,
 386    SemanticTokenModifier::MODIFICATION,
 387    SemanticTokenModifier::DOCUMENTATION,
 388    SemanticTokenModifier::DEFAULT_LIBRARY,
 389    // Language specific things below.
 390    // Rust
 391    SemanticTokenModifier::new("constant"),
 392];
 393
 394impl LanguageServer {
 395    /// Starts a language server process.
 396    /// A request_timeout of zero or Duration::MAX indicates an indefinite timeout.
 397    pub fn new(
 398        stderr_capture: Arc<Mutex<Option<String>>>,
 399        server_id: LanguageServerId,
 400        server_name: LanguageServerName,
 401        binary: LanguageServerBinary,
 402        root_path: &Path,
 403        code_action_kinds: Option<Vec<CodeActionKind>>,
 404        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 405        cx: &mut AsyncApp,
 406    ) -> Result<Self> {
 407        let working_dir = if root_path.is_dir() {
 408            root_path
 409        } else {
 410            root_path.parent().unwrap_or_else(|| Path::new("/"))
 411        };
 412        let root_uri = Uri::from_file_path(&working_dir)
 413            .map_err(|()| anyhow!("{working_dir:?} is not a valid URI"))?;
 414        log::info!(
 415            "starting language server process. binary path: \
 416            {:?}, working directory: {:?}, args: {:?}",
 417            binary.path,
 418            working_dir,
 419            &binary.arguments
 420        );
 421        let mut command = util::command::new_command(&binary.path);
 422        command
 423            .current_dir(working_dir)
 424            .args(&binary.arguments)
 425            .envs(binary.env.clone().unwrap_or_default())
 426            .stdin(Stdio::piped())
 427            .stdout(Stdio::piped())
 428            .stderr(Stdio::piped())
 429            .kill_on_drop(true);
 430
 431        let mut server = command
 432            .spawn()
 433            .with_context(|| format!("failed to spawn command {command:?}",))?;
 434
 435        let stdin = server.stdin.take().unwrap();
 436        let stdout = server.stdout.take().unwrap();
 437        let stderr = server.stderr.take().unwrap();
 438        let server = Self::new_internal(
 439            server_id,
 440            server_name,
 441            stdin,
 442            stdout,
 443            Some(stderr),
 444            stderr_capture,
 445            Some(server),
 446            code_action_kinds,
 447            binary,
 448            root_uri,
 449            workspace_folders,
 450            cx,
 451            move |notification| {
 452                log::info!(
 453                    "Language server with id {} sent unhandled notification {}:\n{}",
 454                    server_id,
 455                    notification.method,
 456                    serde_json::to_string_pretty(&notification.params).unwrap(),
 457                );
 458                false
 459            },
 460        );
 461
 462        Ok(server)
 463    }
 464
 465    fn new_internal<Stdin, Stdout, Stderr, F>(
 466        server_id: LanguageServerId,
 467        server_name: LanguageServerName,
 468        stdin: Stdin,
 469        stdout: Stdout,
 470        stderr: Option<Stderr>,
 471        stderr_capture: Arc<Mutex<Option<String>>>,
 472        server: Option<Child>,
 473        code_action_kinds: Option<Vec<CodeActionKind>>,
 474        binary: LanguageServerBinary,
 475        root_uri: Uri,
 476        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 477        cx: &mut AsyncApp,
 478        on_unhandled_notification: F,
 479    ) -> Self
 480    where
 481        Stdin: AsyncWrite + Unpin + Send + 'static,
 482        Stdout: AsyncRead + Unpin + Send + 'static,
 483        Stderr: AsyncRead + Unpin + Send + 'static,
 484        F: Fn(&NotificationOrRequest) -> bool + 'static + Send + Sync + Clone,
 485    {
 486        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 487        let (output_done_tx, output_done_rx) = barrier::channel();
 488        let notification_handlers =
 489            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 490        let response_handlers =
 491            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 492        let pending_respond_tasks = PendingRespondTasks::default();
 493        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 494
 495        let stdout_input_task = cx.spawn({
 496            let unhandled_notification_wrapper = {
 497                let response_channel = outbound_tx.clone();
 498                async move |msg: NotificationOrRequest| {
 499                    let did_handle = on_unhandled_notification(&msg);
 500                    if !did_handle && let Some(message_id) = msg.id {
 501                        let response = AnyResponse {
 502                            jsonrpc: JSON_RPC_VERSION,
 503                            id: message_id,
 504                            error: Some(Error {
 505                                code: -32601,
 506                                message: format!("Unrecognized method `{}`", msg.method),
 507                                data: None,
 508                            }),
 509                            result: None,
 510                        };
 511                        if let Ok(response) = serde_json::to_string(&response) {
 512                            response_channel.send(response).await.ok();
 513                        }
 514                    }
 515                }
 516            };
 517            let notification_handlers = notification_handlers.clone();
 518            let response_handlers = response_handlers.clone();
 519            let io_handlers = io_handlers.clone();
 520            let pending_respond_tasks = pending_respond_tasks.clone();
 521            async move |cx| {
 522                Self::handle_incoming_messages(
 523                    stdout,
 524                    unhandled_notification_wrapper,
 525                    notification_handlers,
 526                    response_handlers,
 527                    pending_respond_tasks,
 528                    io_handlers,
 529                    cx,
 530                )
 531                .log_err()
 532                .await
 533            }
 534        });
 535        let stderr_input_task = stderr
 536            .map(|stderr| {
 537                let io_handlers = io_handlers.clone();
 538                let stderr_captures = stderr_capture.clone();
 539                cx.background_spawn(async move {
 540                    Self::handle_stderr(stderr, io_handlers, stderr_captures)
 541                        .log_err()
 542                        .await
 543                })
 544            })
 545            .unwrap_or_else(|| Task::ready(None));
 546        let input_task = cx.background_spawn(async move {
 547            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 548            stdout.or(stderr)
 549        });
 550        let output_task = cx.background_spawn({
 551            Self::handle_outgoing_messages(
 552                stdin,
 553                outbound_rx,
 554                output_done_tx,
 555                response_handlers.clone(),
 556                io_handlers.clone(),
 557            )
 558            .log_err()
 559        });
 560
 561        let configuration = DidChangeConfigurationParams {
 562            settings: Value::Null,
 563        }
 564        .into();
 565
 566        let (notification_tx, notification_rx) = channel::unbounded::<NotificationSerializer>();
 567        cx.background_spawn({
 568            let outbound_tx = outbound_tx.clone();
 569            async move {
 570                while let Ok(serializer) = notification_rx.recv().await {
 571                    let serialized = (serializer.0)();
 572                    let Ok(_) = outbound_tx.send(serialized).await else {
 573                        return;
 574                    };
 575                }
 576                outbound_tx.close();
 577            }
 578        })
 579        .detach();
 580        Self {
 581            server_id,
 582            notification_handlers,
 583            notification_tx,
 584            response_handlers,
 585            pending_respond_tasks,
 586            io_handlers,
 587            name: server_name,
 588            version: None,
 589            process_name: binary
 590                .path
 591                .file_name()
 592                .map(|name| Arc::from(name.to_string_lossy()))
 593                .unwrap_or_default(),
 594            binary,
 595            capabilities: Default::default(),
 596            configuration,
 597            code_action_kinds,
 598            next_id: Default::default(),
 599            outbound_tx,
 600            executor: cx.background_executor().clone(),
 601            io_tasks: Mutex::new(Some((input_task, output_task))),
 602            output_done_rx: Mutex::new(Some(output_done_rx)),
 603            server: Arc::new(Mutex::new(server)),
 604            workspace_folders,
 605            root_uri,
 606        }
 607    }
 608
 609    /// List of code action kinds this language server reports being able to emit.
 610    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 611        self.code_action_kinds.clone()
 612    }
 613
 614    async fn handle_incoming_messages<Stdout>(
 615        stdout: Stdout,
 616        on_unhandled_notification: impl AsyncFn(NotificationOrRequest) + 'static + Send,
 617        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 618        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 619        pending_respond_tasks: PendingRespondTasks,
 620        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 621        cx: &mut AsyncApp,
 622    ) -> anyhow::Result<()>
 623    where
 624        Stdout: AsyncRead + Unpin + Send + 'static,
 625    {
 626        use smol::stream::StreamExt;
 627        let stdout = BufReader::new(stdout);
 628        let _clear_response_handlers = util::defer({
 629            let response_handlers = response_handlers.clone();
 630            move || {
 631                response_handlers.lock().take();
 632            }
 633        });
 634        let mut input_handler = input_handler::LspStdoutHandler::new(
 635            stdout,
 636            response_handlers,
 637            io_handlers,
 638            cx.background_executor().clone(),
 639        );
 640
 641        while let Some(msg) = input_handler.incoming_messages.next().await {
 642            if msg.method == <notification::Cancel as notification::Notification>::METHOD {
 643                if let Some(params) = msg.params {
 644                    if let Ok(cancel_params) = serde_json::from_value::<CancelParams>(params) {
 645                        let id = match cancel_params.id {
 646                            NumberOrString::Number(id) => RequestId::Int(id),
 647                            NumberOrString::String(id) => RequestId::Str(id),
 648                        };
 649                        pending_respond_tasks.lock().remove(&id);
 650                    }
 651                }
 652                continue;
 653            }
 654
 655            let unhandled_message = {
 656                let mut notification_handlers = notification_handlers.lock();
 657                if let Some(handler) = notification_handlers.get_mut(msg.method.as_str()) {
 658                    handler(msg.id, msg.params.unwrap_or(Value::Null), cx);
 659                    None
 660                } else {
 661                    Some(msg)
 662                }
 663            };
 664
 665            if let Some(msg) = unhandled_message {
 666                on_unhandled_notification(msg).await;
 667            }
 668
 669            // Don't starve the main thread when receiving lots of notifications at once.
 670            smol::future::yield_now().await;
 671        }
 672        input_handler.loop_handle.await
 673    }
 674
 675    async fn handle_stderr<Stderr>(
 676        stderr: Stderr,
 677        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 678        stderr_capture: Arc<Mutex<Option<String>>>,
 679    ) -> anyhow::Result<()>
 680    where
 681        Stderr: AsyncRead + Unpin + Send + 'static,
 682    {
 683        let mut stderr = BufReader::new(stderr);
 684        let mut buffer = Vec::new();
 685
 686        loop {
 687            buffer.clear();
 688
 689            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 690            if bytes_read == 0 {
 691                return Ok(());
 692            }
 693
 694            if let Ok(message) = std::str::from_utf8(&buffer) {
 695                log::trace!("incoming stderr message:{message}");
 696                for handler in io_handlers.lock().values_mut() {
 697                    handler(IoKind::StdErr, message);
 698                }
 699
 700                if let Some(stderr) = stderr_capture.lock().as_mut() {
 701                    stderr.push_str(message);
 702                }
 703            }
 704
 705            // Don't starve the main thread when receiving lots of messages at once.
 706            smol::future::yield_now().await;
 707        }
 708    }
 709
 710    async fn handle_outgoing_messages<Stdin>(
 711        stdin: Stdin,
 712        outbound_rx: channel::Receiver<String>,
 713        output_done_tx: barrier::Sender,
 714        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 715        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 716    ) -> anyhow::Result<()>
 717    where
 718        Stdin: AsyncWrite + Unpin + Send + 'static,
 719    {
 720        let mut stdin = BufWriter::new(stdin);
 721        let _clear_response_handlers = util::defer({
 722            let response_handlers = response_handlers.clone();
 723            move || {
 724                response_handlers.lock().take();
 725            }
 726        });
 727        let mut content_len_buffer = Vec::new();
 728        while let Ok(message) = outbound_rx.recv().await {
 729            log::trace!("outgoing message:{}", message);
 730            for handler in io_handlers.lock().values_mut() {
 731                handler(IoKind::StdIn, &message);
 732            }
 733
 734            content_len_buffer.clear();
 735            write!(content_len_buffer, "{}", message.len()).unwrap();
 736            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 737            stdin.write_all(&content_len_buffer).await?;
 738            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 739            stdin.write_all(message.as_bytes()).await?;
 740            stdin.flush().await?;
 741        }
 742        drop(output_done_tx);
 743        Ok(())
 744    }
 745
 746    pub fn default_initialize_params(
 747        &self,
 748        pull_diagnostics: bool,
 749        augments_syntax_tokens: bool,
 750        cx: &App,
 751    ) -> InitializeParams {
 752        let workspace_folders = self.workspace_folders.as_ref().map_or_else(
 753            || {
 754                vec![WorkspaceFolder {
 755                    name: Default::default(),
 756                    uri: self.root_uri.clone(),
 757                }]
 758            },
 759            |folders| {
 760                folders
 761                    .lock()
 762                    .iter()
 763                    .cloned()
 764                    .map(|uri| WorkspaceFolder {
 765                        name: Default::default(),
 766                        uri,
 767                    })
 768                    .collect()
 769            },
 770        );
 771
 772        #[allow(deprecated)]
 773        InitializeParams {
 774            process_id: Some(std::process::id()),
 775            root_path: Some(
 776                self.root_uri
 777                    .to_file_path()
 778                    .map(|path| path.to_string_lossy().into_owned())
 779                    .unwrap_or_else(|_| self.root_uri.path().to_string()),
 780            ),
 781            root_uri: Some(self.root_uri.clone()),
 782            initialization_options: None,
 783            capabilities: ClientCapabilities {
 784                general: Some(GeneralClientCapabilities {
 785                    position_encodings: Some(vec![PositionEncodingKind::UTF16]),
 786                    ..GeneralClientCapabilities::default()
 787                }),
 788                workspace: Some(WorkspaceClientCapabilities {
 789                    configuration: Some(true),
 790                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 791                        dynamic_registration: Some(true),
 792                        relative_pattern_support: Some(true),
 793                    }),
 794                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 795                        dynamic_registration: Some(true),
 796                    }),
 797                    workspace_folders: Some(true),
 798                    symbol: Some(WorkspaceSymbolClientCapabilities {
 799                        resolve_support: None,
 800                        dynamic_registration: Some(true),
 801                        ..WorkspaceSymbolClientCapabilities::default()
 802                    }),
 803                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 804                        refresh_support: Some(true),
 805                    }),
 806                    diagnostics: Some(DiagnosticWorkspaceClientCapabilities {
 807                        refresh_support: Some(true),
 808                    })
 809                    .filter(|_| pull_diagnostics),
 810                    code_lens: Some(CodeLensWorkspaceClientCapabilities {
 811                        refresh_support: Some(true),
 812                    }),
 813                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 814                        resource_operations: Some(vec![
 815                            ResourceOperationKind::Create,
 816                            ResourceOperationKind::Rename,
 817                            ResourceOperationKind::Delete,
 818                        ]),
 819                        document_changes: Some(true),
 820                        snippet_edit_support: Some(true),
 821                        ..WorkspaceEditClientCapabilities::default()
 822                    }),
 823                    file_operations: Some(WorkspaceFileOperationsClientCapabilities {
 824                        dynamic_registration: Some(true),
 825                        did_rename: Some(true),
 826                        will_rename: Some(true),
 827                        ..WorkspaceFileOperationsClientCapabilities::default()
 828                    }),
 829                    apply_edit: Some(true),
 830                    execute_command: Some(ExecuteCommandClientCapabilities {
 831                        dynamic_registration: Some(true),
 832                    }),
 833                    semantic_tokens: Some(SemanticTokensWorkspaceClientCapabilities {
 834                        refresh_support: Some(true),
 835                    }),
 836                    ..WorkspaceClientCapabilities::default()
 837                }),
 838                text_document: Some(TextDocumentClientCapabilities {
 839                    definition: Some(GotoCapability {
 840                        link_support: Some(true),
 841                        dynamic_registration: Some(true),
 842                    }),
 843                    code_action: Some(CodeActionClientCapabilities {
 844                        code_action_literal_support: Some(CodeActionLiteralSupport {
 845                            code_action_kind: CodeActionKindLiteralSupport {
 846                                value_set: vec![
 847                                    CodeActionKind::REFACTOR.as_str().into(),
 848                                    CodeActionKind::QUICKFIX.as_str().into(),
 849                                    CodeActionKind::SOURCE.as_str().into(),
 850                                ],
 851                            },
 852                        }),
 853                        data_support: Some(true),
 854                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 855                            properties: vec![
 856                                "kind".to_string(),
 857                                "diagnostics".to_string(),
 858                                "isPreferred".to_string(),
 859                                "disabled".to_string(),
 860                                "edit".to_string(),
 861                                "command".to_string(),
 862                            ],
 863                        }),
 864                        dynamic_registration: Some(true),
 865                        ..CodeActionClientCapabilities::default()
 866                    }),
 867                    completion: Some(CompletionClientCapabilities {
 868                        completion_item: Some(CompletionItemCapability {
 869                            snippet_support: Some(true),
 870                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 871                                properties: vec![
 872                                    "additionalTextEdits".to_string(),
 873                                    "command".to_string(),
 874                                    "detail".to_string(),
 875                                    "documentation".to_string(),
 876                                    // NB: Do not have this resolved, otherwise Zed becomes slow to complete things
 877                                    // "textEdit".to_string(),
 878                                ],
 879                            }),
 880                            deprecated_support: Some(true),
 881                            tag_support: Some(TagSupport {
 882                                value_set: vec![CompletionItemTag::DEPRECATED],
 883                            }),
 884                            insert_replace_support: Some(true),
 885                            label_details_support: Some(true),
 886                            insert_text_mode_support: Some(InsertTextModeSupport {
 887                                value_set: vec![
 888                                    InsertTextMode::AS_IS,
 889                                    InsertTextMode::ADJUST_INDENTATION,
 890                                ],
 891                            }),
 892                            documentation_format: Some(vec![
 893                                MarkupKind::Markdown,
 894                                MarkupKind::PlainText,
 895                            ]),
 896                            ..CompletionItemCapability::default()
 897                        }),
 898                        insert_text_mode: Some(InsertTextMode::ADJUST_INDENTATION),
 899                        completion_list: Some(CompletionListCapability {
 900                            item_defaults: Some(vec![
 901                                "commitCharacters".to_owned(),
 902                                "editRange".to_owned(),
 903                                "insertTextMode".to_owned(),
 904                                "insertTextFormat".to_owned(),
 905                                "data".to_owned(),
 906                            ]),
 907                        }),
 908                        context_support: Some(true),
 909                        dynamic_registration: Some(true),
 910                        ..CompletionClientCapabilities::default()
 911                    }),
 912                    rename: Some(RenameClientCapabilities {
 913                        prepare_support: Some(true),
 914                        prepare_support_default_behavior: Some(
 915                            PrepareSupportDefaultBehavior::IDENTIFIER,
 916                        ),
 917                        dynamic_registration: Some(true),
 918                        ..RenameClientCapabilities::default()
 919                    }),
 920                    hover: Some(HoverClientCapabilities {
 921                        content_format: Some(vec![MarkupKind::Markdown]),
 922                        dynamic_registration: Some(true),
 923                    }),
 924                    inlay_hint: Some(InlayHintClientCapabilities {
 925                        resolve_support: Some(InlayHintResolveClientCapabilities {
 926                            properties: vec![
 927                                "textEdits".to_string(),
 928                                "tooltip".to_string(),
 929                                "label.tooltip".to_string(),
 930                                "label.location".to_string(),
 931                                "label.command".to_string(),
 932                            ],
 933                        }),
 934                        dynamic_registration: Some(true),
 935                    }),
 936                    semantic_tokens: Some(SemanticTokensClientCapabilities {
 937                        dynamic_registration: Some(false),
 938                        requests: SemanticTokensClientCapabilitiesRequests {
 939                            range: None,
 940                            full: Some(SemanticTokensFullOptions::Delta { delta: Some(true) }),
 941                        },
 942                        token_types: SEMANTIC_TOKEN_TYPES.to_vec(),
 943                        token_modifiers: SEMANTIC_TOKEN_MODIFIERS.to_vec(),
 944                        formats: vec![TokenFormat::RELATIVE],
 945                        overlapping_token_support: Some(true),
 946                        multiline_token_support: Some(true),
 947                        server_cancel_support: Some(true),
 948                        augments_syntax_tokens: Some(augments_syntax_tokens),
 949                    }),
 950                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 951                        related_information: Some(true),
 952                        version_support: Some(true),
 953                        data_support: Some(true),
 954                        tag_support: Some(TagSupport {
 955                            value_set: vec![DiagnosticTag::UNNECESSARY, DiagnosticTag::DEPRECATED],
 956                        }),
 957                        code_description_support: Some(true),
 958                    }),
 959                    formatting: Some(DynamicRegistrationClientCapabilities {
 960                        dynamic_registration: Some(true),
 961                    }),
 962                    range_formatting: Some(DynamicRegistrationClientCapabilities {
 963                        dynamic_registration: Some(true),
 964                    }),
 965                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 966                        dynamic_registration: Some(true),
 967                    }),
 968                    signature_help: Some(SignatureHelpClientCapabilities {
 969                        signature_information: Some(SignatureInformationSettings {
 970                            documentation_format: Some(vec![
 971                                MarkupKind::Markdown,
 972                                MarkupKind::PlainText,
 973                            ]),
 974                            parameter_information: Some(ParameterInformationSettings {
 975                                label_offset_support: Some(true),
 976                            }),
 977                            active_parameter_support: Some(true),
 978                        }),
 979                        dynamic_registration: Some(true),
 980                        ..SignatureHelpClientCapabilities::default()
 981                    }),
 982                    synchronization: Some(TextDocumentSyncClientCapabilities {
 983                        did_save: Some(true),
 984                        dynamic_registration: Some(true),
 985                        ..TextDocumentSyncClientCapabilities::default()
 986                    }),
 987                    code_lens: Some(CodeLensClientCapabilities {
 988                        dynamic_registration: Some(true),
 989                    }),
 990                    document_symbol: Some(DocumentSymbolClientCapabilities {
 991                        hierarchical_document_symbol_support: Some(true),
 992                        dynamic_registration: Some(true),
 993                        ..DocumentSymbolClientCapabilities::default()
 994                    }),
 995                    diagnostic: Some(DiagnosticClientCapabilities {
 996                        dynamic_registration: Some(true),
 997                        related_document_support: Some(true),
 998                    })
 999                    .filter(|_| pull_diagnostics),
1000                    color_provider: Some(DocumentColorClientCapabilities {
1001                        dynamic_registration: Some(true),
1002                    }),
1003                    folding_range: Some(FoldingRangeClientCapabilities {
1004                        dynamic_registration: Some(true),
1005                        line_folding_only: Some(false),
1006                        range_limit: None,
1007                        folding_range: Some(FoldingRangeCapability {
1008                            collapsed_text: Some(true),
1009                        }),
1010                        folding_range_kind: Some(FoldingRangeKindCapability {
1011                            value_set: Some(vec![
1012                                FoldingRangeKind::Comment,
1013                                FoldingRangeKind::Region,
1014                                FoldingRangeKind::Imports,
1015                            ]),
1016                        }),
1017                    }),
1018                    ..TextDocumentClientCapabilities::default()
1019                }),
1020                experimental: Some(json!({
1021                    "serverStatusNotification": true,
1022                    "localDocs": true,
1023                })),
1024                window: Some(WindowClientCapabilities {
1025                    work_done_progress: Some(true),
1026                    show_message: Some(ShowMessageRequestClientCapabilities {
1027                        message_action_item: Some(MessageActionItemCapabilities {
1028                            additional_properties_support: Some(true),
1029                        }),
1030                    }),
1031                    ..WindowClientCapabilities::default()
1032                }),
1033            },
1034            trace: None,
1035            workspace_folders: Some(workspace_folders),
1036            client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
1037                ClientInfo {
1038                    name: release_channel.display_name().to_string(),
1039                    version: Some(release_channel::AppVersion::global(cx).to_string()),
1040                }
1041            }),
1042            locale: None,
1043            ..InitializeParams::default()
1044        }
1045    }
1046
1047    /// Initializes a language server by sending the `Initialize` request.
1048    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
1049    ///
1050    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
1051    pub fn initialize(
1052        mut self,
1053        params: InitializeParams,
1054        configuration: Arc<DidChangeConfigurationParams>,
1055        timeout: Duration,
1056        cx: &App,
1057    ) -> Task<Result<Arc<Self>>> {
1058        cx.background_spawn(async move {
1059            let response = self
1060                .request::<request::Initialize>(params, timeout)
1061                .await
1062                .into_response()
1063                .with_context(|| {
1064                    format!(
1065                        "initializing server {}, id {}",
1066                        self.name(),
1067                        self.server_id()
1068                    )
1069                })?;
1070            if let Some(info) = response.server_info {
1071                self.version = info.version.map(SharedString::from);
1072                self.process_name = info.name.into();
1073            }
1074            self.capabilities = RwLock::new(response.capabilities);
1075            self.configuration = configuration;
1076
1077            self.notify::<notification::Initialized>(InitializedParams {})?;
1078            Ok(Arc::new(self))
1079        })
1080    }
1081
1082    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
1083    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>> + use<>> {
1084        let tasks = self.io_tasks.lock().take()?;
1085
1086        let response_handlers = self.response_handlers.clone();
1087        let next_id = AtomicI32::new(self.next_id.load(SeqCst));
1088        let outbound_tx = self.outbound_tx.clone();
1089        let executor = self.executor.clone();
1090        let notification_serializers = self.notification_tx.clone();
1091        let mut output_done = self.output_done_rx.lock().take().unwrap();
1092        let shutdown_request = Self::request_internal::<request::Shutdown>(
1093            &next_id,
1094            &response_handlers,
1095            &outbound_tx,
1096            &notification_serializers,
1097            &executor,
1098            SERVER_SHUTDOWN_TIMEOUT,
1099            (),
1100        );
1101
1102        let server = self.server.clone();
1103        let name = self.name.clone();
1104        let server_id = self.server_id;
1105        let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
1106        Some(async move {
1107            log::debug!("language server shutdown started");
1108
1109            select! {
1110                request_result = shutdown_request.fuse() => {
1111                    match request_result {
1112                        ConnectionResult::Timeout => {
1113                            log::warn!("timeout waiting for language server {name} (id {server_id}) to shutdown");
1114                        },
1115                        ConnectionResult::ConnectionReset => {
1116                            log::warn!("language server {name} (id {server_id}) closed the shutdown request connection");
1117                        },
1118                        ConnectionResult::Result(Err(e)) => {
1119                            log::error!("Shutdown request failure, server {name} (id {server_id}): {e:#}");
1120                        },
1121                        ConnectionResult::Result(Ok(())) => {}
1122                    }
1123                }
1124
1125                _ = timer => {
1126                    log::info!("timeout waiting for language server {name} (id {server_id}) to shutdown");
1127                },
1128            }
1129
1130            response_handlers.lock().take();
1131            Self::notify_internal::<notification::Exit>(&notification_serializers, ()).ok();
1132            notification_serializers.close();
1133            output_done.recv().await;
1134            server.lock().take().map(|mut child| child.kill());
1135            drop(tasks);
1136            log::debug!("language server shutdown finished");
1137            Some(())
1138        })
1139    }
1140
1141    /// Register a handler to handle incoming LSP notifications.
1142    ///
1143    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1144    #[must_use]
1145    pub fn on_notification<T, F>(&self, f: F) -> Subscription
1146    where
1147        T: notification::Notification,
1148        F: 'static + Send + FnMut(T::Params, &mut AsyncApp),
1149    {
1150        self.on_custom_notification(T::METHOD, f)
1151    }
1152
1153    /// Register a handler to handle incoming LSP requests.
1154    ///
1155    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1156    #[must_use]
1157    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
1158    where
1159        T: request::Request,
1160        T::Params: 'static + Send,
1161        F: 'static + FnMut(T::Params, &mut AsyncApp) -> Fut + Send,
1162        Fut: 'static + Future<Output = Result<T::Result>>,
1163    {
1164        self.on_custom_request(T::METHOD, f)
1165    }
1166
1167    /// Registers a handler to inspect all language server process stdio.
1168    #[must_use]
1169    pub fn on_io<F>(&self, f: F) -> Subscription
1170    where
1171        F: 'static + Send + FnMut(IoKind, &str),
1172    {
1173        let id = self.next_id.fetch_add(1, SeqCst);
1174        self.io_handlers.lock().insert(id, Box::new(f));
1175        Subscription::Io {
1176            id,
1177            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
1178        }
1179    }
1180
1181    /// Removes a request handler registers via [`Self::on_request`].
1182    pub fn remove_request_handler<T: request::Request>(&self) {
1183        self.notification_handlers.lock().remove(T::METHOD);
1184    }
1185
1186    /// Removes a notification handler registers via [`Self::on_notification`].
1187    pub fn remove_notification_handler<T: notification::Notification>(&self) {
1188        self.notification_handlers.lock().remove(T::METHOD);
1189    }
1190
1191    /// Checks if a notification handler has been registered via [`Self::on_notification`].
1192    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
1193        self.notification_handlers.lock().contains_key(T::METHOD)
1194    }
1195
1196    #[must_use]
1197    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
1198    where
1199        F: 'static + FnMut(Params, &mut AsyncApp) + Send,
1200        Params: DeserializeOwned,
1201    {
1202        let prev_handler = self.notification_handlers.lock().insert(
1203            method,
1204            Box::new(move |_, params, cx| {
1205                if let Some(params) = serde_json::from_value(params).log_err() {
1206                    f(params, cx);
1207                }
1208            }),
1209        );
1210        assert!(
1211            prev_handler.is_none(),
1212            "registered multiple handlers for the same LSP method"
1213        );
1214        Subscription::Notification {
1215            method,
1216            notification_handlers: Some(self.notification_handlers.clone()),
1217        }
1218    }
1219
1220    #[must_use]
1221    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
1222    where
1223        F: 'static + FnMut(Params, &mut AsyncApp) -> Fut + Send,
1224        Fut: 'static + Future<Output = Result<Res>>,
1225        Params: DeserializeOwned + Send + 'static,
1226        Res: Serialize,
1227    {
1228        let outbound_tx = self.outbound_tx.clone();
1229        let pending_respond_tasks = self.pending_respond_tasks.clone();
1230        let prev_handler = self.notification_handlers.lock().insert(
1231            method,
1232            Box::new(move |id, params, cx| {
1233                if let Some(id) = id {
1234                    match serde_json::from_value(params) {
1235                        Ok(params) => {
1236                            let response = f(params, cx);
1237                            let task = cx.foreground_executor().spawn({
1238                                let outbound_tx = outbound_tx.clone();
1239                                let pending_respond_tasks = pending_respond_tasks.clone();
1240                                let id = id.clone();
1241                                async move {
1242                                    let response = match response.await {
1243                                        Ok(result) => Response {
1244                                            jsonrpc: JSON_RPC_VERSION,
1245                                            id: id.clone(),
1246                                            value: LspResult::Ok(Some(result)),
1247                                        },
1248                                        Err(error) => Response {
1249                                            jsonrpc: JSON_RPC_VERSION,
1250                                            id: id.clone(),
1251                                            value: LspResult::Error(Some(Error {
1252                                                code: lsp_types::error_codes::REQUEST_FAILED,
1253                                                message: error.to_string(),
1254                                                data: None,
1255                                            })),
1256                                        },
1257                                    };
1258                                    if let Some(response) =
1259                                        serde_json::to_string(&response).log_err()
1260                                    {
1261                                        outbound_tx.try_send(response).ok();
1262                                    }
1263                                    pending_respond_tasks.lock().remove(&id);
1264                                }
1265                            });
1266                            pending_respond_tasks.lock().insert(id, task);
1267                        }
1268
1269                        Err(error) => {
1270                            log::error!("error deserializing {} request: {:?}", method, error);
1271                            let response = AnyResponse {
1272                                jsonrpc: JSON_RPC_VERSION,
1273                                id,
1274                                result: None,
1275                                error: Some(Error {
1276                                    code: -32700, // Parse error
1277                                    message: error.to_string(),
1278                                    data: None,
1279                                }),
1280                            };
1281                            if let Some(response) = serde_json::to_string(&response).log_err() {
1282                                outbound_tx.try_send(response).ok();
1283                            }
1284                        }
1285                    }
1286                }
1287            }),
1288        );
1289        assert!(
1290            prev_handler.is_none(),
1291            "registered multiple handlers for the same LSP method"
1292        );
1293        Subscription::Notification {
1294            method,
1295            notification_handlers: Some(self.notification_handlers.clone()),
1296        }
1297    }
1298
1299    /// Get the name of the running language server.
1300    pub fn name(&self) -> LanguageServerName {
1301        self.name.clone()
1302    }
1303
1304    /// Get the version of the running language server.
1305    pub fn version(&self) -> Option<SharedString> {
1306        self.version.clone()
1307    }
1308
1309    /// Get the process name of the running language server.
1310    pub fn process_name(&self) -> &str {
1311        &self.process_name
1312    }
1313
1314    /// Get the reported capabilities of the running language server.
1315    pub fn capabilities(&self) -> ServerCapabilities {
1316        self.capabilities.read().clone()
1317    }
1318
1319    /// Get the reported capabilities of the running language server and
1320    /// what we know on the client/adapter-side of its capabilities.
1321    pub fn adapter_server_capabilities(&self) -> AdapterServerCapabilities {
1322        AdapterServerCapabilities {
1323            server_capabilities: self.capabilities(),
1324            code_action_kinds: self.code_action_kinds(),
1325        }
1326    }
1327
1328    /// Update the capabilities of the running language server.
1329    pub fn update_capabilities(&self, update: impl FnOnce(&mut ServerCapabilities)) {
1330        update(self.capabilities.write().deref_mut());
1331    }
1332
1333    /// Get the individual configuration settings for the running language server.
1334    /// Does not include globally applied settings (which are stored in ProjectSettings::GlobalLspSettings).
1335    pub fn configuration(&self) -> &Value {
1336        &self.configuration.settings
1337    }
1338
1339    /// Get the ID of the running language server.
1340    pub fn server_id(&self) -> LanguageServerId {
1341        self.server_id
1342    }
1343
1344    /// Get the process ID of the running language server, if available.
1345    pub fn process_id(&self) -> Option<u32> {
1346        self.server.lock().as_ref().map(|child| child.id())
1347    }
1348
1349    /// Get the binary information of the running language server.
1350    pub fn binary(&self) -> &LanguageServerBinary {
1351        &self.binary
1352    }
1353
1354    /// Send a RPC request to the language server.
1355    ///
1356    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1357    pub fn request<T: request::Request>(
1358        &self,
1359        params: T::Params,
1360        request_timeout: Duration,
1361    ) -> impl LspRequestFuture<T::Result> + use<T>
1362    where
1363        T::Result: 'static + Send,
1364    {
1365        Self::request_internal::<T>(
1366            &self.next_id,
1367            &self.response_handlers,
1368            &self.outbound_tx,
1369            &self.notification_tx,
1370            &self.executor,
1371            request_timeout,
1372            params,
1373        )
1374    }
1375
1376    /// Send a RPC request to the language server with a custom timer.
1377    /// Once the attached future becomes ready, the request will time out with the provided output message.
1378    ///
1379    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1380    pub fn request_with_timer<T: request::Request, U: Future<Output = String>>(
1381        &self,
1382        params: T::Params,
1383        timer: U,
1384    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1385    where
1386        T::Result: 'static + Send,
1387    {
1388        Self::request_internal_with_timer::<T, U>(
1389            &self.next_id,
1390            &self.response_handlers,
1391            &self.outbound_tx,
1392            &self.notification_tx,
1393            &self.executor,
1394            timer,
1395            params,
1396        )
1397    }
1398
1399    fn request_internal_with_timer<T, U>(
1400        next_id: &AtomicI32,
1401        response_handlers: &Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
1402        outbound_tx: &channel::Sender<String>,
1403        notification_serializers: &channel::Sender<NotificationSerializer>,
1404        executor: &BackgroundExecutor,
1405        timer: U,
1406        params: T::Params,
1407    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1408    where
1409        T::Result: 'static + Send,
1410        T: request::Request,
1411        U: Future<Output = String>,
1412    {
1413        let id = next_id.fetch_add(1, SeqCst);
1414        let message = serde_json::to_string(&Request {
1415            jsonrpc: JSON_RPC_VERSION,
1416            id: RequestId::Int(id),
1417            method: T::METHOD,
1418            params,
1419        })
1420        .expect("LSP message should be serializable to JSON");
1421
1422        let (tx, rx) = oneshot::channel();
1423        let handle_response = response_handlers
1424            .lock()
1425            .as_mut()
1426            .context("server shut down")
1427            .map(|handlers| {
1428                let executor = executor.clone();
1429                handlers.insert(
1430                    RequestId::Int(id),
1431                    Box::new(move |result| {
1432                        executor
1433                            .spawn(async move {
1434                                let response = match result {
1435                                    Ok(response) => match serde_json::from_str(&response) {
1436                                        Ok(deserialized) => Ok(deserialized),
1437                                        Err(error) => {
1438                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
1439                                            Err(error).context("failed to deserialize response")
1440                                        }
1441                                    }
1442                                    Err(error) => Err(anyhow!("{}", error.message)),
1443                                };
1444                                tx.send(response).ok();
1445                            })
1446                    }),
1447                );
1448            });
1449
1450        let send = outbound_tx
1451            .try_send(message)
1452            .context("failed to write to language server's stdin");
1453
1454        let response_handlers = Arc::clone(response_handlers);
1455        let notification_serializers = notification_serializers.downgrade();
1456        let started = Instant::now();
1457        LspRequest::new(id, async move {
1458            if let Err(e) = handle_response {
1459                return ConnectionResult::Result(Err(e));
1460            }
1461            if let Err(e) = send {
1462                return ConnectionResult::Result(Err(e));
1463            }
1464
1465            let cancel_on_drop = util::defer(move || {
1466                if let Some(notification_serializers) = notification_serializers.upgrade() {
1467                    Self::notify_internal::<notification::Cancel>(
1468                        &notification_serializers,
1469                        CancelParams {
1470                            id: NumberOrString::Number(id),
1471                        },
1472                    )
1473                    .ok();
1474                }
1475            });
1476
1477            let method = T::METHOD;
1478            select! {
1479                response = rx.fuse() => {
1480                    let elapsed = started.elapsed();
1481                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1482                    cancel_on_drop.abort();
1483                    match response {
1484                        Ok(response_result) => ConnectionResult::Result(response_result),
1485                        Err(Canceled) => {
1486                            log::error!("Server reset connection for a request {method:?} id {id}");
1487                            ConnectionResult::ConnectionReset
1488                        },
1489                    }
1490                }
1491
1492                message = timer.fuse() => {
1493                    log::error!("Cancelled LSP request task for {method:?} id {id} {message}");
1494                    match response_handlers
1495                        .lock()
1496                        .as_mut()
1497                        .context("server shut down") {
1498                            Ok(handlers) => {
1499                                handlers.remove(&RequestId::Int(id));
1500                                ConnectionResult::Timeout
1501                            }
1502                            Err(e) => ConnectionResult::Result(Err(e)),
1503                        }
1504                }
1505            }
1506        })
1507    }
1508
1509    fn request_internal<T>(
1510        next_id: &AtomicI32,
1511        response_handlers: &Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
1512        outbound_tx: &channel::Sender<String>,
1513        notification_serializers: &channel::Sender<NotificationSerializer>,
1514        executor: &BackgroundExecutor,
1515        request_timeout: Duration,
1516        params: T::Params,
1517    ) -> impl LspRequestFuture<T::Result> + use<T>
1518    where
1519        T::Result: 'static + Send,
1520        T: request::Request,
1521    {
1522        Self::request_internal_with_timer::<T, _>(
1523            next_id,
1524            response_handlers,
1525            outbound_tx,
1526            notification_serializers,
1527            executor,
1528            Self::request_timeout_future(executor.clone(), request_timeout),
1529            params,
1530        )
1531    }
1532
1533    /// Internal function to return a Future from a configured timeout duration.
1534    /// If the duration is zero or `Duration::MAX`, the returned future never completes.
1535    fn request_timeout_future(
1536        executor: BackgroundExecutor,
1537        request_timeout: Duration,
1538    ) -> impl Future<Output = String> {
1539        if request_timeout == Duration::MAX || request_timeout == Duration::ZERO {
1540            return Either::Left(future::pending::<String>());
1541        }
1542
1543        Either::Right(
1544            executor
1545                .timer(request_timeout)
1546                .map(move |_| format!("which took over {request_timeout:?}")),
1547        )
1548    }
1549
1550    /// Obtain a request timer for the LSP.
1551    pub fn request_timer(&self, timeout: Duration) -> impl Future<Output = String> {
1552        Self::request_timeout_future(self.executor.clone(), timeout)
1553    }
1554
1555    /// Sends a RPC notification to the language server.
1556    ///
1557    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1558    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1559        let outbound = self.notification_tx.clone();
1560        Self::notify_internal::<T>(&outbound, params)
1561    }
1562
1563    fn notify_internal<T: notification::Notification>(
1564        outbound_tx: &channel::Sender<NotificationSerializer>,
1565        params: T::Params,
1566    ) -> Result<()> {
1567        let serializer = NotificationSerializer(Box::new(move || {
1568            serde_json::to_string(&Notification {
1569                jsonrpc: JSON_RPC_VERSION,
1570                method: T::METHOD,
1571                params,
1572            })
1573            .unwrap()
1574        }));
1575
1576        outbound_tx.send_blocking(serializer)?;
1577        Ok(())
1578    }
1579
1580    /// Add new workspace folder to the list.
1581    pub fn add_workspace_folder(&self, uri: Uri) {
1582        if self
1583            .capabilities()
1584            .workspace
1585            .and_then(|ws| {
1586                ws.workspace_folders.and_then(|folders| {
1587                    folders
1588                        .change_notifications
1589                        .map(|caps| matches!(caps, OneOf::Left(false)))
1590                })
1591            })
1592            .unwrap_or(true)
1593        {
1594            return;
1595        }
1596
1597        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1598            return;
1599        };
1600        let is_new_folder = workspace_folders.lock().insert(uri.clone());
1601        if is_new_folder {
1602            let params = DidChangeWorkspaceFoldersParams {
1603                event: WorkspaceFoldersChangeEvent {
1604                    added: vec![WorkspaceFolder {
1605                        uri,
1606                        name: String::default(),
1607                    }],
1608                    removed: vec![],
1609                },
1610            };
1611            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1612        }
1613    }
1614
1615    /// Remove existing workspace folder from the list.
1616    pub fn remove_workspace_folder(&self, uri: Uri) {
1617        if self
1618            .capabilities()
1619            .workspace
1620            .and_then(|ws| {
1621                ws.workspace_folders.and_then(|folders| {
1622                    folders
1623                        .change_notifications
1624                        .map(|caps| !matches!(caps, OneOf::Left(false)))
1625                })
1626            })
1627            .unwrap_or(true)
1628        {
1629            return;
1630        }
1631        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1632            return;
1633        };
1634        let was_removed = workspace_folders.lock().remove(&uri);
1635        if was_removed {
1636            let params = DidChangeWorkspaceFoldersParams {
1637                event: WorkspaceFoldersChangeEvent {
1638                    added: vec![],
1639                    removed: vec![WorkspaceFolder {
1640                        uri,
1641                        name: String::default(),
1642                    }],
1643                },
1644            };
1645            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1646        }
1647    }
1648    pub fn set_workspace_folders(&self, folders: BTreeSet<Uri>) {
1649        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1650            return;
1651        };
1652        let mut workspace_folders = workspace_folders.lock();
1653
1654        let old_workspace_folders = std::mem::take(&mut *workspace_folders);
1655        let added: Vec<_> = folders
1656            .difference(&old_workspace_folders)
1657            .map(|uri| WorkspaceFolder {
1658                uri: uri.clone(),
1659                name: String::default(),
1660            })
1661            .collect();
1662
1663        let removed: Vec<_> = old_workspace_folders
1664            .difference(&folders)
1665            .map(|uri| WorkspaceFolder {
1666                uri: uri.clone(),
1667                name: String::default(),
1668            })
1669            .collect();
1670        *workspace_folders = folders;
1671        let should_notify = !added.is_empty() || !removed.is_empty();
1672        if should_notify {
1673            drop(workspace_folders);
1674            let params = DidChangeWorkspaceFoldersParams {
1675                event: WorkspaceFoldersChangeEvent { added, removed },
1676            };
1677            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1678        }
1679    }
1680
1681    pub fn workspace_folders(&self) -> BTreeSet<Uri> {
1682        self.workspace_folders.as_ref().map_or_else(
1683            || BTreeSet::from_iter([self.root_uri.clone()]),
1684            |folders| folders.lock().clone(),
1685        )
1686    }
1687
1688    pub fn register_buffer(
1689        &self,
1690        uri: Uri,
1691        language_id: String,
1692        version: i32,
1693        initial_text: String,
1694    ) {
1695        self.notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1696            text_document: TextDocumentItem::new(uri, language_id, version, initial_text),
1697        })
1698        .ok();
1699    }
1700
1701    pub fn unregister_buffer(&self, uri: Uri) {
1702        self.notify::<notification::DidCloseTextDocument>(DidCloseTextDocumentParams {
1703            text_document: TextDocumentIdentifier::new(uri),
1704        })
1705        .ok();
1706    }
1707}
1708
1709impl Drop for LanguageServer {
1710    fn drop(&mut self) {
1711        if let Some(shutdown) = self.shutdown() {
1712            self.executor.spawn(shutdown).detach();
1713        }
1714    }
1715}
1716
1717impl Subscription {
1718    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1719    pub fn detach(&mut self) {
1720        match self {
1721            Subscription::Notification {
1722                notification_handlers,
1723                ..
1724            } => *notification_handlers = None,
1725            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1726        }
1727    }
1728}
1729
1730impl fmt::Display for LanguageServerId {
1731    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1732        self.0.fmt(f)
1733    }
1734}
1735
1736impl fmt::Debug for LanguageServer {
1737    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1738        f.debug_struct("LanguageServer")
1739            .field("id", &self.server_id.0)
1740            .field("name", &self.name)
1741            .finish_non_exhaustive()
1742    }
1743}
1744
1745impl fmt::Debug for LanguageServerBinary {
1746    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1747        let mut debug = f.debug_struct("LanguageServerBinary");
1748        debug.field("path", &self.path);
1749        debug.field("arguments", &self.arguments);
1750
1751        if let Some(env) = &self.env {
1752            let redacted_env: BTreeMap<String, String> = env
1753                .iter()
1754                .map(|(key, value)| {
1755                    let redacted_value = if redact::should_redact(key) {
1756                        "REDACTED".to_string()
1757                    } else {
1758                        value.clone()
1759                    };
1760                    (key.clone(), redacted_value)
1761                })
1762                .collect();
1763            debug.field("env", &Some(redacted_env));
1764        } else {
1765            debug.field("env", &self.env);
1766        }
1767
1768        debug.finish()
1769    }
1770}
1771
1772impl Drop for Subscription {
1773    fn drop(&mut self) {
1774        match self {
1775            Subscription::Notification {
1776                method,
1777                notification_handlers,
1778            } => {
1779                if let Some(handlers) = notification_handlers {
1780                    handlers.lock().remove(method);
1781                }
1782            }
1783            Subscription::Io { id, io_handlers } => {
1784                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1785                    io_handlers.lock().remove(id);
1786                }
1787            }
1788        }
1789    }
1790}
1791
1792/// Mock language server for use in tests.
1793#[cfg(any(test, feature = "test-support"))]
1794#[derive(Clone)]
1795pub struct FakeLanguageServer {
1796    pub binary: LanguageServerBinary,
1797    pub server: Arc<LanguageServer>,
1798    notifications_rx: channel::Receiver<(String, String)>,
1799}
1800
1801#[cfg(any(test, feature = "test-support"))]
1802impl FakeLanguageServer {
1803    /// Construct a fake language server.
1804    pub fn new(
1805        server_id: LanguageServerId,
1806        binary: LanguageServerBinary,
1807        name: String,
1808        capabilities: ServerCapabilities,
1809        cx: &mut AsyncApp,
1810    ) -> (LanguageServer, FakeLanguageServer) {
1811        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1812        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1813        let (notifications_tx, notifications_rx) = channel::unbounded();
1814
1815        let server_name = LanguageServerName(name.clone().into());
1816        let process_name = Arc::from(name.as_str());
1817        let root = Self::root_path();
1818        let workspace_folders: Arc<Mutex<BTreeSet<Uri>>> = Default::default();
1819        let mut server = LanguageServer::new_internal(
1820            server_id,
1821            server_name.clone(),
1822            stdin_writer,
1823            stdout_reader,
1824            None::<async_pipe::PipeReader>,
1825            Arc::new(Mutex::new(None)),
1826            None,
1827            None,
1828            binary.clone(),
1829            root,
1830            Some(workspace_folders.clone()),
1831            cx,
1832            |_| false,
1833        );
1834        server.process_name = process_name;
1835        let fake = FakeLanguageServer {
1836            binary: binary.clone(),
1837            server: Arc::new({
1838                let mut server = LanguageServer::new_internal(
1839                    server_id,
1840                    server_name,
1841                    stdout_writer,
1842                    stdin_reader,
1843                    None::<async_pipe::PipeReader>,
1844                    Arc::new(Mutex::new(None)),
1845                    None,
1846                    None,
1847                    binary,
1848                    Self::root_path(),
1849                    Some(workspace_folders),
1850                    cx,
1851                    move |msg| {
1852                        notifications_tx
1853                            .try_send((
1854                                msg.method.to_string(),
1855                                msg.params.as_ref().unwrap_or(&Value::Null).to_string(),
1856                            ))
1857                            .ok();
1858                        true
1859                    },
1860                );
1861                server.process_name = name.as_str().into();
1862                server
1863            }),
1864            notifications_rx,
1865        };
1866        fake.set_request_handler::<request::Initialize, _, _>({
1867            let capabilities = capabilities;
1868            move |_, _| {
1869                let capabilities = capabilities.clone();
1870                let name = name.clone();
1871                async move {
1872                    Ok(InitializeResult {
1873                        capabilities,
1874                        server_info: Some(ServerInfo {
1875                            name,
1876                            ..Default::default()
1877                        }),
1878                    })
1879                }
1880            }
1881        });
1882
1883        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1884
1885        (server, fake)
1886    }
1887    #[cfg(target_os = "windows")]
1888    fn root_path() -> Uri {
1889        Uri::from_file_path("C:/").unwrap()
1890    }
1891
1892    #[cfg(not(target_os = "windows"))]
1893    fn root_path() -> Uri {
1894        Uri::from_file_path("/").unwrap()
1895    }
1896}
1897
1898#[cfg(any(test, feature = "test-support"))]
1899impl LanguageServer {
1900    pub fn full_capabilities() -> ServerCapabilities {
1901        ServerCapabilities {
1902            document_highlight_provider: Some(OneOf::Left(true)),
1903            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1904            document_formatting_provider: Some(OneOf::Left(true)),
1905            document_range_formatting_provider: Some(OneOf::Left(true)),
1906            definition_provider: Some(OneOf::Left(true)),
1907            workspace_symbol_provider: Some(OneOf::Left(true)),
1908            implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1909            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1910            ..ServerCapabilities::default()
1911        }
1912    }
1913}
1914
1915#[cfg(any(test, feature = "test-support"))]
1916impl FakeLanguageServer {
1917    /// See [`LanguageServer::notify`].
1918    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1919        self.server.notify::<T>(params).ok();
1920    }
1921
1922    /// See [`LanguageServer::request`].
1923    pub async fn request<T>(
1924        &self,
1925        params: T::Params,
1926        timeout: Duration,
1927    ) -> ConnectionResult<T::Result>
1928    where
1929        T: request::Request,
1930        T::Result: 'static + Send,
1931    {
1932        self.server.request::<T>(params, timeout).await
1933    }
1934
1935    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1936    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1937        self.try_receive_notification::<T>().await.unwrap()
1938    }
1939
1940    /// Consumes the notification channel until it finds a notification for the specified type.
1941    pub async fn try_receive_notification<T: notification::Notification>(
1942        &mut self,
1943    ) -> Option<T::Params> {
1944        loop {
1945            let (method, params) = self.notifications_rx.recv().await.ok()?;
1946            if method == T::METHOD {
1947                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1948            } else {
1949                log::info!("skipping message in fake language server {:?}", params);
1950            }
1951        }
1952    }
1953
1954    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1955    pub fn set_request_handler<T, F, Fut>(
1956        &self,
1957        mut handler: F,
1958    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1959    where
1960        T: 'static + request::Request,
1961        T::Params: 'static + Send,
1962        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp) -> Fut,
1963        Fut: 'static + Future<Output = Result<T::Result>>,
1964    {
1965        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1966        self.server.remove_request_handler::<T>();
1967        self.server
1968            .on_request::<T, _, _>(move |params, cx| {
1969                let result = handler(params, cx.clone());
1970                let responded_tx = responded_tx.clone();
1971                let executor = cx.background_executor().clone();
1972                async move {
1973                    executor.simulate_random_delay().await;
1974                    let result = result.await;
1975                    responded_tx.unbounded_send(()).ok();
1976                    result
1977                }
1978            })
1979            .detach();
1980        responded_rx
1981    }
1982
1983    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1984    pub fn handle_notification<T, F>(
1985        &self,
1986        mut handler: F,
1987    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1988    where
1989        T: 'static + notification::Notification,
1990        T::Params: 'static + Send,
1991        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp),
1992    {
1993        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1994        self.server.remove_notification_handler::<T>();
1995        self.server
1996            .on_notification::<T, _>(move |params, cx| {
1997                handler(params, cx.clone());
1998                handled_tx.unbounded_send(()).ok();
1999            })
2000            .detach();
2001        handled_rx
2002    }
2003
2004    /// Removes any existing handler for specified notification type.
2005    pub fn remove_request_handler<T>(&mut self)
2006    where
2007        T: 'static + request::Request,
2008    {
2009        self.server.remove_request_handler::<T>();
2010    }
2011
2012    /// Simulate that the server has started work and notifies about its progress with the specified token.
2013    pub async fn start_progress(&self, token: impl Into<String>) {
2014        self.start_progress_with(token, Default::default(), Default::default())
2015            .await
2016    }
2017
2018    pub async fn start_progress_with(
2019        &self,
2020        token: impl Into<String>,
2021        progress: WorkDoneProgressBegin,
2022        request_timeout: Duration,
2023    ) {
2024        let token = token.into();
2025        self.request::<request::WorkDoneProgressCreate>(
2026            WorkDoneProgressCreateParams {
2027                token: NumberOrString::String(token.clone()),
2028            },
2029            request_timeout,
2030        )
2031        .await
2032        .into_response()
2033        .unwrap();
2034        self.notify::<notification::Progress>(ProgressParams {
2035            token: NumberOrString::String(token),
2036            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(progress)),
2037        });
2038    }
2039
2040    /// Simulate that the server has completed work and notifies about that with the specified token.
2041    pub fn end_progress(&self, token: impl Into<String>) {
2042        self.notify::<notification::Progress>(ProgressParams {
2043            token: NumberOrString::String(token.into()),
2044            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
2045        });
2046    }
2047}
2048
2049#[cfg(test)]
2050mod tests {
2051    use super::*;
2052    use gpui::TestAppContext;
2053    use std::str::FromStr;
2054
2055    #[ctor::ctor]
2056    fn init_logger() {
2057        zlog::init_test();
2058    }
2059
2060    #[gpui::test]
2061    async fn test_fake(cx: &mut TestAppContext) {
2062        cx.update(|cx| {
2063            release_channel::init(semver::Version::new(0, 0, 0), cx);
2064        });
2065        let (server, mut fake) = FakeLanguageServer::new(
2066            LanguageServerId(0),
2067            LanguageServerBinary {
2068                path: "path/to/language-server".into(),
2069                arguments: vec![],
2070                env: None,
2071            },
2072            "the-lsp".to_string(),
2073            Default::default(),
2074            &mut cx.to_async(),
2075        );
2076
2077        let (message_tx, message_rx) = channel::unbounded();
2078        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
2079        server
2080            .on_notification::<notification::ShowMessage, _>(move |params, _| {
2081                message_tx.try_send(params).unwrap()
2082            })
2083            .detach();
2084        server
2085            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
2086                diagnostics_tx.try_send(params).unwrap()
2087            })
2088            .detach();
2089
2090        let server = cx
2091            .update(|cx| {
2092                let params = server.default_initialize_params(false, false, cx);
2093                let configuration = DidChangeConfigurationParams {
2094                    settings: Default::default(),
2095                };
2096                server.initialize(
2097                    params,
2098                    configuration.into(),
2099                    DEFAULT_LSP_REQUEST_TIMEOUT,
2100                    cx,
2101                )
2102            })
2103            .await
2104            .unwrap();
2105        server
2106            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
2107                text_document: TextDocumentItem::new(
2108                    Uri::from_str("file://a/b").unwrap(),
2109                    "rust".to_string(),
2110                    0,
2111                    "".to_string(),
2112                ),
2113            })
2114            .unwrap();
2115        assert_eq!(
2116            fake.receive_notification::<notification::DidOpenTextDocument>()
2117                .await
2118                .text_document
2119                .uri
2120                .as_str(),
2121            "file://a/b"
2122        );
2123
2124        fake.notify::<notification::ShowMessage>(ShowMessageParams {
2125            typ: MessageType::ERROR,
2126            message: "ok".to_string(),
2127        });
2128        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
2129            uri: Uri::from_str("file://b/c").unwrap(),
2130            version: Some(5),
2131            diagnostics: vec![],
2132        });
2133        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
2134        assert_eq!(
2135            diagnostics_rx.recv().await.unwrap().uri.as_str(),
2136            "file://b/c"
2137        );
2138
2139        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
2140
2141        drop(server);
2142        cx.run_until_parked();
2143        fake.receive_notification::<notification::Exit>().await;
2144    }
2145
2146    #[gpui::test]
2147    fn test_deserialize_string_digit_id() {
2148        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2149        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2150            .expect("message with string id should be parsed");
2151        let expected_id = RequestId::Str("2".to_string());
2152        assert_eq!(notification.id, Some(expected_id));
2153    }
2154
2155    #[gpui::test]
2156    fn test_deserialize_string_id() {
2157        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2158        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2159            .expect("message with string id should be parsed");
2160        let expected_id = RequestId::Str("anythingAtAll".to_string());
2161        assert_eq!(notification.id, Some(expected_id));
2162    }
2163
2164    #[gpui::test]
2165    fn test_deserialize_int_id() {
2166        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
2167        let notification = serde_json::from_str::<NotificationOrRequest>(json)
2168            .expect("message with string id should be parsed");
2169        let expected_id = RequestId::Int(2);
2170        assert_eq!(notification.id, Some(expected_id));
2171    }
2172
2173    #[test]
2174    fn test_serialize_has_no_nulls() {
2175        // Ensure we're not setting both result and error variants. (ticket #10595)
2176        let no_tag = Response::<u32> {
2177            jsonrpc: "",
2178            id: RequestId::Int(0),
2179            value: LspResult::Ok(None),
2180        };
2181        assert_eq!(
2182            serde_json::to_string(&no_tag).unwrap(),
2183            "{\"jsonrpc\":\"\",\"id\":0,\"result\":null}"
2184        );
2185        let no_tag = Response::<u32> {
2186            jsonrpc: "",
2187            id: RequestId::Int(0),
2188            value: LspResult::Error(None),
2189        };
2190        assert_eq!(
2191            serde_json::to_string(&no_tag).unwrap(),
2192            "{\"jsonrpc\":\"\",\"id\":0,\"error\":null}"
2193        );
2194    }
2195
2196    #[gpui::test]
2197    async fn test_initialize_params_has_root_path_and_root_uri(cx: &mut TestAppContext) {
2198        cx.update(|cx| {
2199            release_channel::init(semver::Version::new(0, 0, 0), cx);
2200        });
2201        let (server, _fake) = FakeLanguageServer::new(
2202            LanguageServerId(0),
2203            LanguageServerBinary {
2204                path: "path/to/language-server".into(),
2205                arguments: vec![],
2206                env: None,
2207            },
2208            "test-lsp".to_string(),
2209            Default::default(),
2210            &mut cx.to_async(),
2211        );
2212
2213        let params = cx.update(|cx| server.default_initialize_params(false, false, cx));
2214
2215        #[allow(deprecated)]
2216        let root_uri = params.root_uri.expect("root_uri should be set");
2217        #[allow(deprecated)]
2218        let root_path = params.root_path.expect("root_path should be set");
2219
2220        let expected_path = root_uri
2221            .to_file_path()
2222            .expect("root_uri should be a valid file path");
2223        assert_eq!(
2224            root_path,
2225            expected_path.to_string_lossy(),
2226            "root_path should be derived from root_uri"
2227        );
2228    }
2229}