lsp.rs

   1mod input_handler;
   2
   3pub use lsp_types::request::*;
   4pub use lsp_types::*;
   5
   6use anyhow::{Context as _, Result, anyhow};
   7use collections::{BTreeMap, HashMap};
   8use futures::{
   9    AsyncRead, AsyncWrite, Future, FutureExt,
  10    channel::oneshot::{self, Canceled},
  11    io::BufWriter,
  12    select,
  13};
  14use gpui::{App, AppContext as _, AsyncApp, BackgroundExecutor, SharedString, Task};
  15use notification::DidChangeWorkspaceFolders;
  16use parking_lot::{Mutex, RwLock};
  17use postage::{barrier, prelude::Stream};
  18use schemars::JsonSchema;
  19use serde::{Deserialize, Serialize, de::DeserializeOwned};
  20use serde_json::{Value, json, value::RawValue};
  21use smol::{
  22    channel,
  23    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
  24    process::Child,
  25};
  26
  27use std::{
  28    collections::BTreeSet,
  29    ffi::{OsStr, OsString},
  30    fmt,
  31    io::Write,
  32    ops::DerefMut,
  33    path::PathBuf,
  34    pin::Pin,
  35    sync::{
  36        Arc, Weak,
  37        atomic::{AtomicI32, Ordering::SeqCst},
  38    },
  39    task::Poll,
  40    time::{Duration, Instant},
  41};
  42use std::{path::Path, process::Stdio};
  43use util::{ConnectionResult, ResultExt, TryFutureExt, redact};
  44
  45const JSON_RPC_VERSION: &str = "2.0";
  46const CONTENT_LEN_HEADER: &str = "Content-Length: ";
  47
  48pub const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
  49const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
  50
  51type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, &mut AsyncApp)>;
  52type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
  53type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
  54
  55/// Kind of language server stdio given to an IO handler.
  56#[derive(Debug, Clone, Copy)]
  57pub enum IoKind {
  58    StdOut,
  59    StdIn,
  60    StdErr,
  61}
  62
  63/// Represents a launchable language server. This can either be a standalone binary or the path
  64/// to a runtime with arguments to instruct it to launch the actual language server file.
  65#[derive(Clone)]
  66pub struct LanguageServerBinary {
  67    pub path: PathBuf,
  68    pub arguments: Vec<OsString>,
  69    pub env: Option<HashMap<String, String>>,
  70}
  71
  72/// Configures the search (and installation) of language servers.
  73#[derive(Debug, Clone)]
  74pub struct LanguageServerBinaryOptions {
  75    /// Whether the adapter should look at the users system
  76    pub allow_path_lookup: bool,
  77    /// Whether the adapter should download its own version
  78    pub allow_binary_download: bool,
  79    /// Whether the adapter should download a pre-release version
  80    pub pre_release: bool,
  81}
  82
  83struct NotificationSerializer(Box<dyn FnOnce() -> String + Send + Sync>);
  84
  85/// A running language server process.
  86pub struct LanguageServer {
  87    server_id: LanguageServerId,
  88    next_id: AtomicI32,
  89    outbound_tx: channel::Sender<String>,
  90    notification_tx: channel::Sender<NotificationSerializer>,
  91    name: LanguageServerName,
  92    process_name: Arc<str>,
  93    binary: LanguageServerBinary,
  94    capabilities: RwLock<ServerCapabilities>,
  95    /// Configuration sent to the server, stored for display in the language server logs
  96    /// buffer. This is represented as the message sent to the LSP in order to avoid cloning it (can
  97    /// be large in cases like sending schemas to the json server).
  98    configuration: Arc<DidChangeConfigurationParams>,
  99    code_action_kinds: Option<Vec<CodeActionKind>>,
 100    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 101    response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 102    io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 103    executor: BackgroundExecutor,
 104    #[allow(clippy::type_complexity)]
 105    io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
 106    output_done_rx: Mutex<Option<barrier::Receiver>>,
 107    server: Arc<Mutex<Option<Child>>>,
 108    workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 109    root_uri: Uri,
 110}
 111
 112#[derive(Clone, Debug, PartialEq, Eq, Hash)]
 113pub enum LanguageServerSelector {
 114    Id(LanguageServerId),
 115    Name(LanguageServerName),
 116}
 117
 118/// Identifies a running language server.
 119#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 120#[repr(transparent)]
 121pub struct LanguageServerId(pub usize);
 122
 123impl LanguageServerId {
 124    pub fn from_proto(id: u64) -> Self {
 125        Self(id as usize)
 126    }
 127
 128    pub fn to_proto(self) -> u64 {
 129        self.0 as u64
 130    }
 131}
 132
 133/// A name of a language server.
 134#[derive(
 135    Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize, JsonSchema,
 136)]
 137#[serde(transparent)]
 138pub struct LanguageServerName(pub SharedString);
 139
 140impl std::fmt::Display for LanguageServerName {
 141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 142        std::fmt::Display::fmt(&self.0, f)
 143    }
 144}
 145
 146impl AsRef<str> for LanguageServerName {
 147    fn as_ref(&self) -> &str {
 148        self.0.as_ref()
 149    }
 150}
 151
 152impl AsRef<OsStr> for LanguageServerName {
 153    fn as_ref(&self) -> &OsStr {
 154        self.0.as_ref().as_ref()
 155    }
 156}
 157
 158impl LanguageServerName {
 159    pub const fn new_static(s: &'static str) -> Self {
 160        Self(SharedString::new_static(s))
 161    }
 162
 163    pub fn from_proto(s: String) -> Self {
 164        Self(s.into())
 165    }
 166}
 167
 168impl<'a> From<&'a str> for LanguageServerName {
 169    fn from(str: &'a str) -> LanguageServerName {
 170        LanguageServerName(str.to_string().into())
 171    }
 172}
 173
 174impl PartialEq<str> for LanguageServerName {
 175    fn eq(&self, other: &str) -> bool {
 176        self.0 == other
 177    }
 178}
 179
 180/// Handle to a language server RPC activity subscription.
 181pub enum Subscription {
 182    Notification {
 183        method: &'static str,
 184        notification_handlers: Option<Arc<Mutex<HashMap<&'static str, NotificationHandler>>>>,
 185    },
 186    Io {
 187        id: i32,
 188        io_handlers: Option<Weak<Mutex<HashMap<i32, IoHandler>>>>,
 189    },
 190}
 191
 192/// Language server protocol RPC request message ID.
 193///
 194/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 195#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
 196#[serde(untagged)]
 197pub enum RequestId {
 198    Int(i32),
 199    Str(String),
 200}
 201
 202/// Language server protocol RPC request message.
 203///
 204/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
 205#[derive(Serialize, Deserialize)]
 206pub struct Request<'a, T> {
 207    jsonrpc: &'static str,
 208    id: RequestId,
 209    method: &'a str,
 210    params: T,
 211}
 212
 213/// Language server protocol RPC request response message before it is deserialized into a concrete type.
 214#[derive(Serialize, Deserialize)]
 215struct AnyResponse<'a> {
 216    jsonrpc: &'a str,
 217    id: RequestId,
 218    #[serde(default)]
 219    error: Option<Error>,
 220    #[serde(borrow)]
 221    result: Option<&'a RawValue>,
 222}
 223
 224/// Language server protocol RPC request response message.
 225///
 226/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#responseMessage)
 227#[derive(Serialize)]
 228struct Response<T> {
 229    jsonrpc: &'static str,
 230    id: RequestId,
 231    #[serde(flatten)]
 232    value: LspResult<T>,
 233}
 234
 235#[derive(Serialize)]
 236#[serde(rename_all = "snake_case")]
 237enum LspResult<T> {
 238    #[serde(rename = "result")]
 239    Ok(Option<T>),
 240    Error(Option<Error>),
 241}
 242
 243/// Language server protocol RPC notification message.
 244///
 245/// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
 246#[derive(Serialize, Deserialize)]
 247struct Notification<'a, T> {
 248    jsonrpc: &'static str,
 249    #[serde(borrow)]
 250    method: &'a str,
 251    params: T,
 252}
 253
 254/// Language server RPC notification message before it is deserialized into a concrete type.
 255#[derive(Debug, Clone, Deserialize)]
 256struct NotificationOrRequest {
 257    #[serde(default)]
 258    id: Option<RequestId>,
 259    method: String,
 260    #[serde(default)]
 261    params: Option<Value>,
 262}
 263
 264#[derive(Debug, Serialize, Deserialize)]
 265struct Error {
 266    code: i64,
 267    message: String,
 268    #[serde(default)]
 269    data: Option<serde_json::Value>,
 270}
 271
 272pub trait LspRequestFuture<O>: Future<Output = ConnectionResult<O>> {
 273    fn id(&self) -> i32;
 274}
 275
 276struct LspRequest<F> {
 277    id: i32,
 278    request: F,
 279}
 280
 281impl<F> LspRequest<F> {
 282    pub fn new(id: i32, request: F) -> Self {
 283        Self { id, request }
 284    }
 285}
 286
 287impl<F: Future> Future for LspRequest<F> {
 288    type Output = F::Output;
 289
 290    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
 291        // SAFETY: This is standard pin projection, we're pinned so our fields must be pinned.
 292        let inner = unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().request) };
 293        inner.poll(cx)
 294    }
 295}
 296
 297impl<F, O> LspRequestFuture<O> for LspRequest<F>
 298where
 299    F: Future<Output = ConnectionResult<O>>,
 300{
 301    fn id(&self) -> i32 {
 302        self.id
 303    }
 304}
 305
 306/// Combined capabilities of the server and the adapter.
 307#[derive(Debug)]
 308pub struct AdapterServerCapabilities {
 309    // Reported capabilities by the server
 310    pub server_capabilities: ServerCapabilities,
 311    // List of code actions supported by the LspAdapter matching the server
 312    pub code_action_kinds: Option<Vec<CodeActionKind>>,
 313}
 314
 315impl LanguageServer {
 316    /// Starts a language server process.
 317    pub fn new(
 318        stderr_capture: Arc<Mutex<Option<String>>>,
 319        server_id: LanguageServerId,
 320        server_name: LanguageServerName,
 321        binary: LanguageServerBinary,
 322        root_path: &Path,
 323        code_action_kinds: Option<Vec<CodeActionKind>>,
 324        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 325        cx: &mut AsyncApp,
 326    ) -> Result<Self> {
 327        let working_dir = if root_path.is_dir() {
 328            root_path
 329        } else {
 330            root_path.parent().unwrap_or_else(|| Path::new("/"))
 331        };
 332        let root_uri = Uri::from_file_path(&working_dir)
 333            .map_err(|()| anyhow!("{working_dir:?} is not a valid URI"))?;
 334
 335        log::info!(
 336            "starting language server process. binary path: {:?}, working directory: {:?}, args: {:?}",
 337            binary.path,
 338            working_dir,
 339            &binary.arguments
 340        );
 341
 342        let mut command = util::command::new_smol_command(&binary.path);
 343        command
 344            .current_dir(working_dir)
 345            .args(&binary.arguments)
 346            .envs(binary.env.clone().unwrap_or_default())
 347            .stdin(Stdio::piped())
 348            .stdout(Stdio::piped())
 349            .stderr(Stdio::piped())
 350            .kill_on_drop(true);
 351        let mut server = command
 352            .spawn()
 353            .with_context(|| format!("failed to spawn command {command:?}",))?;
 354
 355        let stdin = server.stdin.take().unwrap();
 356        let stdout = server.stdout.take().unwrap();
 357        let stderr = server.stderr.take().unwrap();
 358        let server = Self::new_internal(
 359            server_id,
 360            server_name,
 361            stdin,
 362            stdout,
 363            Some(stderr),
 364            stderr_capture,
 365            Some(server),
 366            code_action_kinds,
 367            binary,
 368            root_uri,
 369            workspace_folders,
 370            cx,
 371            move |notification| {
 372                log::info!(
 373                    "Language server with id {} sent unhandled notification {}:\n{}",
 374                    server_id,
 375                    notification.method,
 376                    serde_json::to_string_pretty(&notification.params).unwrap(),
 377                );
 378                false
 379            },
 380        );
 381
 382        Ok(server)
 383    }
 384
 385    fn new_internal<Stdin, Stdout, Stderr, F>(
 386        server_id: LanguageServerId,
 387        server_name: LanguageServerName,
 388        stdin: Stdin,
 389        stdout: Stdout,
 390        stderr: Option<Stderr>,
 391        stderr_capture: Arc<Mutex<Option<String>>>,
 392        server: Option<Child>,
 393        code_action_kinds: Option<Vec<CodeActionKind>>,
 394        binary: LanguageServerBinary,
 395        root_uri: Uri,
 396        workspace_folders: Option<Arc<Mutex<BTreeSet<Uri>>>>,
 397        cx: &mut AsyncApp,
 398        on_unhandled_notification: F,
 399    ) -> Self
 400    where
 401        Stdin: AsyncWrite + Unpin + Send + 'static,
 402        Stdout: AsyncRead + Unpin + Send + 'static,
 403        Stderr: AsyncRead + Unpin + Send + 'static,
 404        F: Fn(&NotificationOrRequest) -> bool + 'static + Send + Sync + Clone,
 405    {
 406        let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
 407        let (output_done_tx, output_done_rx) = barrier::channel();
 408        let notification_handlers =
 409            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
 410        let response_handlers =
 411            Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
 412        let io_handlers = Arc::new(Mutex::new(HashMap::default()));
 413
 414        let stdout_input_task = cx.spawn({
 415            let unhandled_notification_wrapper = {
 416                let response_channel = outbound_tx.clone();
 417                async move |msg: NotificationOrRequest| {
 418                    let did_handle = on_unhandled_notification(&msg);
 419                    if !did_handle && let Some(message_id) = msg.id {
 420                        let response = AnyResponse {
 421                            jsonrpc: JSON_RPC_VERSION,
 422                            id: message_id,
 423                            error: Some(Error {
 424                                code: -32601,
 425                                message: format!("Unrecognized method `{}`", msg.method),
 426                                data: None,
 427                            }),
 428                            result: None,
 429                        };
 430                        if let Ok(response) = serde_json::to_string(&response) {
 431                            response_channel.send(response).await.ok();
 432                        }
 433                    }
 434                }
 435            };
 436            let notification_handlers = notification_handlers.clone();
 437            let response_handlers = response_handlers.clone();
 438            let io_handlers = io_handlers.clone();
 439            async move |cx| {
 440                Self::handle_incoming_messages(
 441                    stdout,
 442                    unhandled_notification_wrapper,
 443                    notification_handlers,
 444                    response_handlers,
 445                    io_handlers,
 446                    cx,
 447                )
 448                .log_err()
 449                .await
 450            }
 451        });
 452        let stderr_input_task = stderr
 453            .map(|stderr| {
 454                let io_handlers = io_handlers.clone();
 455                let stderr_captures = stderr_capture.clone();
 456                cx.background_spawn(async move {
 457                    Self::handle_stderr(stderr, io_handlers, stderr_captures)
 458                        .log_err()
 459                        .await
 460                })
 461            })
 462            .unwrap_or_else(|| Task::ready(None));
 463        let input_task = cx.background_spawn(async move {
 464            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
 465            stdout.or(stderr)
 466        });
 467        let output_task = cx.background_spawn({
 468            Self::handle_outgoing_messages(
 469                stdin,
 470                outbound_rx,
 471                output_done_tx,
 472                response_handlers.clone(),
 473                io_handlers.clone(),
 474            )
 475            .log_err()
 476        });
 477
 478        let configuration = DidChangeConfigurationParams {
 479            settings: Value::Null,
 480        }
 481        .into();
 482
 483        let (notification_tx, notification_rx) = channel::unbounded::<NotificationSerializer>();
 484        cx.background_spawn({
 485            let outbound_tx = outbound_tx.clone();
 486            async move {
 487                while let Ok(serializer) = notification_rx.recv().await {
 488                    let serialized = (serializer.0)();
 489                    let Ok(_) = outbound_tx.send(serialized).await else {
 490                        return;
 491                    };
 492                }
 493                outbound_tx.close();
 494            }
 495        })
 496        .detach();
 497        Self {
 498            server_id,
 499            notification_handlers,
 500            notification_tx,
 501            response_handlers,
 502            io_handlers,
 503            name: server_name,
 504            process_name: binary
 505                .path
 506                .file_name()
 507                .map(|name| Arc::from(name.to_string_lossy()))
 508                .unwrap_or_default(),
 509            binary,
 510            capabilities: Default::default(),
 511            configuration,
 512            code_action_kinds,
 513            next_id: Default::default(),
 514            outbound_tx,
 515            executor: cx.background_executor().clone(),
 516            io_tasks: Mutex::new(Some((input_task, output_task))),
 517            output_done_rx: Mutex::new(Some(output_done_rx)),
 518            server: Arc::new(Mutex::new(server)),
 519            workspace_folders,
 520            root_uri,
 521        }
 522    }
 523
 524    /// List of code action kinds this language server reports being able to emit.
 525    pub fn code_action_kinds(&self) -> Option<Vec<CodeActionKind>> {
 526        self.code_action_kinds.clone()
 527    }
 528
 529    async fn handle_incoming_messages<Stdout>(
 530        stdout: Stdout,
 531        on_unhandled_notification: impl AsyncFn(NotificationOrRequest) + 'static + Send,
 532        notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 533        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 534        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 535        cx: &mut AsyncApp,
 536    ) -> anyhow::Result<()>
 537    where
 538        Stdout: AsyncRead + Unpin + Send + 'static,
 539    {
 540        use smol::stream::StreamExt;
 541        let stdout = BufReader::new(stdout);
 542        let _clear_response_handlers = util::defer({
 543            let response_handlers = response_handlers.clone();
 544            // todo: check if this makes
 545            move || {
 546                if let Some(handlers) = response_handlers.lock().as_mut() {
 547                    handlers.clear();
 548                }
 549            }
 550        });
 551        let mut input_handler = input_handler::LspStdoutHandler::new(
 552            stdout,
 553            response_handlers,
 554            io_handlers,
 555            cx.background_executor().clone(),
 556        );
 557
 558        while let Some(msg) = input_handler.incoming_messages.next().await {
 559            let unhandled_message = {
 560                let mut notification_handlers = notification_handlers.lock();
 561                if let Some(handler) = notification_handlers.get_mut(msg.method.as_str()) {
 562                    handler(msg.id, msg.params.unwrap_or(Value::Null), cx);
 563                    None
 564                } else {
 565                    Some(msg)
 566                }
 567            };
 568
 569            if let Some(msg) = unhandled_message {
 570                on_unhandled_notification(msg).await;
 571            }
 572
 573            // Don't starve the main thread when receiving lots of notifications at once.
 574            smol::future::yield_now().await;
 575        }
 576        input_handler.loop_handle.await
 577    }
 578
 579    async fn handle_stderr<Stderr>(
 580        stderr: Stderr,
 581        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 582        stderr_capture: Arc<Mutex<Option<String>>>,
 583    ) -> anyhow::Result<()>
 584    where
 585        Stderr: AsyncRead + Unpin + Send + 'static,
 586    {
 587        let mut stderr = BufReader::new(stderr);
 588        let mut buffer = Vec::new();
 589
 590        loop {
 591            buffer.clear();
 592
 593            let bytes_read = stderr.read_until(b'\n', &mut buffer).await?;
 594            if bytes_read == 0 {
 595                return Ok(());
 596            }
 597
 598            if let Ok(message) = std::str::from_utf8(&buffer) {
 599                log::trace!("incoming stderr message:{message}");
 600                for handler in io_handlers.lock().values_mut() {
 601                    handler(IoKind::StdErr, message);
 602                }
 603
 604                if let Some(stderr) = stderr_capture.lock().as_mut() {
 605                    stderr.push_str(message);
 606                }
 607            }
 608
 609            // Don't starve the main thread when receiving lots of messages at once.
 610            smol::future::yield_now().await;
 611        }
 612    }
 613
 614    async fn handle_outgoing_messages<Stdin>(
 615        stdin: Stdin,
 616        outbound_rx: channel::Receiver<String>,
 617        output_done_tx: barrier::Sender,
 618        response_handlers: Arc<Mutex<Option<HashMap<RequestId, ResponseHandler>>>>,
 619        io_handlers: Arc<Mutex<HashMap<i32, IoHandler>>>,
 620    ) -> anyhow::Result<()>
 621    where
 622        Stdin: AsyncWrite + Unpin + Send + 'static,
 623    {
 624        let mut stdin = BufWriter::new(stdin);
 625        let _clear_response_handlers = util::defer({
 626            let response_handlers = response_handlers.clone();
 627            // todo: check if this makes
 628            move || {
 629                if let Some(handlers) = response_handlers.lock().as_mut() {
 630                    handlers.clear();
 631                }
 632            }
 633        });
 634        let mut content_len_buffer = Vec::new();
 635        while let Ok(message) = outbound_rx.recv().await {
 636            log::trace!("outgoing message:{}", message);
 637            for handler in io_handlers.lock().values_mut() {
 638                handler(IoKind::StdIn, &message);
 639            }
 640
 641            content_len_buffer.clear();
 642            write!(content_len_buffer, "{}", message.len()).unwrap();
 643            stdin.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
 644            stdin.write_all(&content_len_buffer).await?;
 645            stdin.write_all("\r\n\r\n".as_bytes()).await?;
 646            stdin.write_all(message.as_bytes()).await?;
 647            stdin.flush().await?;
 648        }
 649        drop(output_done_tx);
 650        Ok(())
 651    }
 652
 653    pub fn default_initialize_params(&self, pull_diagnostics: bool, cx: &App) -> InitializeParams {
 654        let workspace_folders = self.workspace_folders.as_ref().map_or_else(
 655            || {
 656                vec![WorkspaceFolder {
 657                    name: Default::default(),
 658                    uri: self.root_uri.clone(),
 659                }]
 660            },
 661            |folders| {
 662                folders
 663                    .lock()
 664                    .iter()
 665                    .cloned()
 666                    .map(|uri| WorkspaceFolder {
 667                        name: Default::default(),
 668                        uri,
 669                    })
 670                    .collect()
 671            },
 672        );
 673
 674        #[allow(deprecated)]
 675        InitializeParams {
 676            process_id: None,
 677            root_path: None,
 678            root_uri: Some(self.root_uri.clone()),
 679            initialization_options: None,
 680            capabilities: ClientCapabilities {
 681                general: Some(GeneralClientCapabilities {
 682                    position_encodings: Some(vec![PositionEncodingKind::UTF16]),
 683                    ..GeneralClientCapabilities::default()
 684                }),
 685                workspace: Some(WorkspaceClientCapabilities {
 686                    configuration: Some(true),
 687                    did_change_watched_files: Some(DidChangeWatchedFilesClientCapabilities {
 688                        dynamic_registration: Some(true),
 689                        relative_pattern_support: Some(true),
 690                    }),
 691                    did_change_configuration: Some(DynamicRegistrationClientCapabilities {
 692                        dynamic_registration: Some(true),
 693                    }),
 694                    workspace_folders: Some(true),
 695                    symbol: Some(WorkspaceSymbolClientCapabilities {
 696                        resolve_support: None,
 697                        dynamic_registration: Some(true),
 698                        ..WorkspaceSymbolClientCapabilities::default()
 699                    }),
 700                    inlay_hint: Some(InlayHintWorkspaceClientCapabilities {
 701                        refresh_support: Some(true),
 702                    }),
 703                    diagnostics: Some(DiagnosticWorkspaceClientCapabilities {
 704                        refresh_support: Some(true),
 705                    })
 706                    .filter(|_| pull_diagnostics),
 707                    code_lens: Some(CodeLensWorkspaceClientCapabilities {
 708                        refresh_support: Some(true),
 709                    }),
 710                    workspace_edit: Some(WorkspaceEditClientCapabilities {
 711                        resource_operations: Some(vec![
 712                            ResourceOperationKind::Create,
 713                            ResourceOperationKind::Rename,
 714                            ResourceOperationKind::Delete,
 715                        ]),
 716                        document_changes: Some(true),
 717                        snippet_edit_support: Some(true),
 718                        ..WorkspaceEditClientCapabilities::default()
 719                    }),
 720                    file_operations: Some(WorkspaceFileOperationsClientCapabilities {
 721                        dynamic_registration: Some(true),
 722                        did_rename: Some(true),
 723                        will_rename: Some(true),
 724                        ..WorkspaceFileOperationsClientCapabilities::default()
 725                    }),
 726                    apply_edit: Some(true),
 727                    execute_command: Some(ExecuteCommandClientCapabilities {
 728                        dynamic_registration: Some(true),
 729                    }),
 730                    ..WorkspaceClientCapabilities::default()
 731                }),
 732                text_document: Some(TextDocumentClientCapabilities {
 733                    definition: Some(GotoCapability {
 734                        link_support: Some(true),
 735                        dynamic_registration: Some(true),
 736                    }),
 737                    code_action: Some(CodeActionClientCapabilities {
 738                        code_action_literal_support: Some(CodeActionLiteralSupport {
 739                            code_action_kind: CodeActionKindLiteralSupport {
 740                                value_set: vec![
 741                                    CodeActionKind::REFACTOR.as_str().into(),
 742                                    CodeActionKind::QUICKFIX.as_str().into(),
 743                                    CodeActionKind::SOURCE.as_str().into(),
 744                                ],
 745                            },
 746                        }),
 747                        data_support: Some(true),
 748                        resolve_support: Some(CodeActionCapabilityResolveSupport {
 749                            properties: vec![
 750                                "kind".to_string(),
 751                                "diagnostics".to_string(),
 752                                "isPreferred".to_string(),
 753                                "disabled".to_string(),
 754                                "edit".to_string(),
 755                                "command".to_string(),
 756                            ],
 757                        }),
 758                        dynamic_registration: Some(true),
 759                        ..CodeActionClientCapabilities::default()
 760                    }),
 761                    completion: Some(CompletionClientCapabilities {
 762                        completion_item: Some(CompletionItemCapability {
 763                            snippet_support: Some(true),
 764                            resolve_support: Some(CompletionItemCapabilityResolveSupport {
 765                                properties: vec![
 766                                    "additionalTextEdits".to_string(),
 767                                    "command".to_string(),
 768                                    "documentation".to_string(),
 769                                    // NB: Do not have this resolved, otherwise Zed becomes slow to complete things
 770                                    // "textEdit".to_string(),
 771                                ],
 772                            }),
 773                            insert_replace_support: Some(true),
 774                            label_details_support: Some(true),
 775                            insert_text_mode_support: Some(InsertTextModeSupport {
 776                                value_set: vec![
 777                                    InsertTextMode::AS_IS,
 778                                    InsertTextMode::ADJUST_INDENTATION,
 779                                ],
 780                            }),
 781                            documentation_format: Some(vec![
 782                                MarkupKind::Markdown,
 783                                MarkupKind::PlainText,
 784                            ]),
 785                            ..CompletionItemCapability::default()
 786                        }),
 787                        insert_text_mode: Some(InsertTextMode::ADJUST_INDENTATION),
 788                        completion_list: Some(CompletionListCapability {
 789                            item_defaults: Some(vec![
 790                                "commitCharacters".to_owned(),
 791                                "editRange".to_owned(),
 792                                "insertTextMode".to_owned(),
 793                                "insertTextFormat".to_owned(),
 794                                "data".to_owned(),
 795                            ]),
 796                        }),
 797                        context_support: Some(true),
 798                        dynamic_registration: Some(true),
 799                        ..CompletionClientCapabilities::default()
 800                    }),
 801                    rename: Some(RenameClientCapabilities {
 802                        prepare_support: Some(true),
 803                        prepare_support_default_behavior: Some(
 804                            PrepareSupportDefaultBehavior::IDENTIFIER,
 805                        ),
 806                        dynamic_registration: Some(true),
 807                        ..RenameClientCapabilities::default()
 808                    }),
 809                    hover: Some(HoverClientCapabilities {
 810                        content_format: Some(vec![MarkupKind::Markdown]),
 811                        dynamic_registration: Some(true),
 812                    }),
 813                    inlay_hint: Some(InlayHintClientCapabilities {
 814                        resolve_support: Some(InlayHintResolveClientCapabilities {
 815                            properties: vec![
 816                                "textEdits".to_string(),
 817                                "tooltip".to_string(),
 818                                "label.tooltip".to_string(),
 819                                "label.location".to_string(),
 820                                "label.command".to_string(),
 821                            ],
 822                        }),
 823                        dynamic_registration: Some(true),
 824                    }),
 825                    publish_diagnostics: Some(PublishDiagnosticsClientCapabilities {
 826                        related_information: Some(true),
 827                        version_support: Some(true),
 828                        data_support: Some(true),
 829                        tag_support: Some(TagSupport {
 830                            value_set: vec![DiagnosticTag::UNNECESSARY, DiagnosticTag::DEPRECATED],
 831                        }),
 832                        code_description_support: Some(true),
 833                    }),
 834                    formatting: Some(DynamicRegistrationClientCapabilities {
 835                        dynamic_registration: Some(true),
 836                    }),
 837                    range_formatting: Some(DynamicRegistrationClientCapabilities {
 838                        dynamic_registration: Some(true),
 839                    }),
 840                    on_type_formatting: Some(DynamicRegistrationClientCapabilities {
 841                        dynamic_registration: Some(true),
 842                    }),
 843                    signature_help: Some(SignatureHelpClientCapabilities {
 844                        signature_information: Some(SignatureInformationSettings {
 845                            documentation_format: Some(vec![
 846                                MarkupKind::Markdown,
 847                                MarkupKind::PlainText,
 848                            ]),
 849                            parameter_information: Some(ParameterInformationSettings {
 850                                label_offset_support: Some(true),
 851                            }),
 852                            active_parameter_support: Some(true),
 853                        }),
 854                        dynamic_registration: Some(true),
 855                        ..SignatureHelpClientCapabilities::default()
 856                    }),
 857                    synchronization: Some(TextDocumentSyncClientCapabilities {
 858                        did_save: Some(true),
 859                        dynamic_registration: Some(true),
 860                        ..TextDocumentSyncClientCapabilities::default()
 861                    }),
 862                    code_lens: Some(CodeLensClientCapabilities {
 863                        dynamic_registration: Some(true),
 864                    }),
 865                    document_symbol: Some(DocumentSymbolClientCapabilities {
 866                        hierarchical_document_symbol_support: Some(true),
 867                        dynamic_registration: Some(true),
 868                        ..DocumentSymbolClientCapabilities::default()
 869                    }),
 870                    diagnostic: Some(DiagnosticClientCapabilities {
 871                        dynamic_registration: Some(true),
 872                        related_document_support: Some(true),
 873                    })
 874                    .filter(|_| pull_diagnostics),
 875                    color_provider: Some(DocumentColorClientCapabilities {
 876                        dynamic_registration: Some(true),
 877                    }),
 878                    ..TextDocumentClientCapabilities::default()
 879                }),
 880                experimental: Some(json!({
 881                    "serverStatusNotification": true,
 882                    "localDocs": true,
 883                })),
 884                window: Some(WindowClientCapabilities {
 885                    work_done_progress: Some(true),
 886                    show_message: Some(ShowMessageRequestClientCapabilities {
 887                        message_action_item: None,
 888                    }),
 889                    ..WindowClientCapabilities::default()
 890                }),
 891            },
 892            trace: None,
 893            workspace_folders: Some(workspace_folders),
 894            client_info: release_channel::ReleaseChannel::try_global(cx).map(|release_channel| {
 895                ClientInfo {
 896                    name: release_channel.display_name().to_string(),
 897                    version: Some(release_channel::AppVersion::global(cx).to_string()),
 898                }
 899            }),
 900            locale: None,
 901            ..InitializeParams::default()
 902        }
 903    }
 904
 905    /// Initializes a language server by sending the `Initialize` request.
 906    /// Note that `options` is used directly to construct [`InitializeParams`], which is why it is owned.
 907    ///
 908    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#initialize)
 909    pub fn initialize(
 910        mut self,
 911        params: InitializeParams,
 912        configuration: Arc<DidChangeConfigurationParams>,
 913        cx: &App,
 914    ) -> Task<Result<Arc<Self>>> {
 915        cx.background_spawn(async move {
 916            let response = self
 917                .request::<request::Initialize>(params)
 918                .await
 919                .into_response()
 920                .with_context(|| {
 921                    format!(
 922                        "initializing server {}, id {}",
 923                        self.name(),
 924                        self.server_id()
 925                    )
 926                })?;
 927            if let Some(info) = response.server_info {
 928                self.process_name = info.name.into();
 929            }
 930            self.capabilities = RwLock::new(response.capabilities);
 931            self.configuration = configuration;
 932
 933            self.notify::<notification::Initialized>(InitializedParams {})?;
 934            Ok(Arc::new(self))
 935        })
 936    }
 937
 938    /// Sends a shutdown request to the language server process and prepares the [`LanguageServer`] to be dropped.
 939    pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>> + use<>> {
 940        if let Some(tasks) = self.io_tasks.lock().take() {
 941            let response_handlers = self.response_handlers.clone();
 942            let next_id = AtomicI32::new(self.next_id.load(SeqCst));
 943            let outbound_tx = self.outbound_tx.clone();
 944            let executor = self.executor.clone();
 945            let notification_serializers = self.notification_tx.clone();
 946            let mut output_done = self.output_done_rx.lock().take().unwrap();
 947            let shutdown_request = Self::request_internal::<request::Shutdown>(
 948                &next_id,
 949                &response_handlers,
 950                &outbound_tx,
 951                &notification_serializers,
 952                &executor,
 953                (),
 954            );
 955
 956            let server = self.server.clone();
 957            let name = self.name.clone();
 958            let server_id = self.server_id;
 959            let mut timer = self.executor.timer(SERVER_SHUTDOWN_TIMEOUT).fuse();
 960            Some(async move {
 961                log::debug!("language server shutdown started");
 962
 963                select! {
 964                    request_result = shutdown_request.fuse() => {
 965                        match request_result {
 966                            ConnectionResult::Timeout => {
 967                                log::warn!("timeout waiting for language server {name} (id {server_id}) to shutdown");
 968                            },
 969                            ConnectionResult::ConnectionReset => {
 970                                log::warn!("language server {name} (id {server_id}) closed the shutdown request connection");
 971                            },
 972                            ConnectionResult::Result(Err(e)) => {
 973                                log::error!("Shutdown request failure, server {name} (id {server_id}): {e:#}");
 974                            },
 975                            ConnectionResult::Result(Ok(())) => {}
 976                        }
 977                    }
 978
 979                    _ = timer => {
 980                        log::info!("timeout waiting for language server {name} (id {server_id}) to shutdown");
 981                    },
 982                }
 983
 984                response_handlers.lock().take();
 985                Self::notify_internal::<notification::Exit>(&notification_serializers, ()).ok();
 986                notification_serializers.close();
 987                output_done.recv().await;
 988                server.lock().take().map(|mut child| child.kill());
 989                drop(tasks);
 990                log::debug!("language server shutdown finished");
 991                Some(())
 992            })
 993        } else {
 994            None
 995        }
 996    }
 997
 998    /// Register a handler to handle incoming LSP notifications.
 999    ///
1000    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1001    #[must_use]
1002    pub fn on_notification<T, F>(&self, f: F) -> Subscription
1003    where
1004        T: notification::Notification,
1005        F: 'static + Send + FnMut(T::Params, &mut AsyncApp),
1006    {
1007        self.on_custom_notification(T::METHOD, f)
1008    }
1009
1010    /// Register a handler to handle incoming LSP requests.
1011    ///
1012    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1013    #[must_use]
1014    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
1015    where
1016        T: request::Request,
1017        T::Params: 'static + Send,
1018        F: 'static + FnMut(T::Params, &mut AsyncApp) -> Fut + Send,
1019        Fut: 'static + Future<Output = Result<T::Result>>,
1020    {
1021        self.on_custom_request(T::METHOD, f)
1022    }
1023
1024    /// Registers a handler to inspect all language server process stdio.
1025    #[must_use]
1026    pub fn on_io<F>(&self, f: F) -> Subscription
1027    where
1028        F: 'static + Send + FnMut(IoKind, &str),
1029    {
1030        let id = self.next_id.fetch_add(1, SeqCst);
1031        self.io_handlers.lock().insert(id, Box::new(f));
1032        Subscription::Io {
1033            id,
1034            io_handlers: Some(Arc::downgrade(&self.io_handlers)),
1035        }
1036    }
1037
1038    /// Removes a request handler registers via [`Self::on_request`].
1039    pub fn remove_request_handler<T: request::Request>(&self) {
1040        self.notification_handlers.lock().remove(T::METHOD);
1041    }
1042
1043    /// Removes a notification handler registers via [`Self::on_notification`].
1044    pub fn remove_notification_handler<T: notification::Notification>(&self) {
1045        self.notification_handlers.lock().remove(T::METHOD);
1046    }
1047
1048    /// Checks if a notification handler has been registered via [`Self::on_notification`].
1049    pub fn has_notification_handler<T: notification::Notification>(&self) -> bool {
1050        self.notification_handlers.lock().contains_key(T::METHOD)
1051    }
1052
1053    #[must_use]
1054    fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
1055    where
1056        F: 'static + FnMut(Params, &mut AsyncApp) + Send,
1057        Params: DeserializeOwned,
1058    {
1059        let prev_handler = self.notification_handlers.lock().insert(
1060            method,
1061            Box::new(move |_, params, cx| {
1062                if let Some(params) = serde_json::from_value(params).log_err() {
1063                    f(params, cx);
1064                }
1065            }),
1066        );
1067        assert!(
1068            prev_handler.is_none(),
1069            "registered multiple handlers for the same LSP method"
1070        );
1071        Subscription::Notification {
1072            method,
1073            notification_handlers: Some(self.notification_handlers.clone()),
1074        }
1075    }
1076
1077    #[must_use]
1078    fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
1079    where
1080        F: 'static + FnMut(Params, &mut AsyncApp) -> Fut + Send,
1081        Fut: 'static + Future<Output = Result<Res>>,
1082        Params: DeserializeOwned + Send + 'static,
1083        Res: Serialize,
1084    {
1085        let outbound_tx = self.outbound_tx.clone();
1086        let prev_handler = self.notification_handlers.lock().insert(
1087            method,
1088            Box::new(move |id, params, cx| {
1089                if let Some(id) = id {
1090                    match serde_json::from_value(params) {
1091                        Ok(params) => {
1092                            let response = f(params, cx);
1093                            cx.foreground_executor()
1094                                .spawn({
1095                                    let outbound_tx = outbound_tx.clone();
1096                                    async move {
1097                                        let response = match response.await {
1098                                            Ok(result) => Response {
1099                                                jsonrpc: JSON_RPC_VERSION,
1100                                                id,
1101                                                value: LspResult::Ok(Some(result)),
1102                                            },
1103                                            Err(error) => Response {
1104                                                jsonrpc: JSON_RPC_VERSION,
1105                                                id,
1106                                                value: LspResult::Error(Some(Error {
1107                                                    code: lsp_types::error_codes::REQUEST_FAILED,
1108                                                    message: error.to_string(),
1109                                                    data: None,
1110                                                })),
1111                                            },
1112                                        };
1113                                        if let Some(response) =
1114                                            serde_json::to_string(&response).log_err()
1115                                        {
1116                                            outbound_tx.try_send(response).ok();
1117                                        }
1118                                    }
1119                                })
1120                                .detach();
1121                        }
1122
1123                        Err(error) => {
1124                            log::error!("error deserializing {} request: {:?}", method, error);
1125                            let response = AnyResponse {
1126                                jsonrpc: JSON_RPC_VERSION,
1127                                id,
1128                                result: None,
1129                                error: Some(Error {
1130                                    code: -32700, // Parse error
1131                                    message: error.to_string(),
1132                                    data: None,
1133                                }),
1134                            };
1135                            if let Some(response) = serde_json::to_string(&response).log_err() {
1136                                outbound_tx.try_send(response).ok();
1137                            }
1138                        }
1139                    }
1140                }
1141            }),
1142        );
1143        assert!(
1144            prev_handler.is_none(),
1145            "registered multiple handlers for the same LSP method"
1146        );
1147        Subscription::Notification {
1148            method,
1149            notification_handlers: Some(self.notification_handlers.clone()),
1150        }
1151    }
1152
1153    /// Get the name of the running language server.
1154    pub fn name(&self) -> LanguageServerName {
1155        self.name.clone()
1156    }
1157
1158    pub fn process_name(&self) -> &str {
1159        &self.process_name
1160    }
1161
1162    /// Get the reported capabilities of the running language server.
1163    pub fn capabilities(&self) -> ServerCapabilities {
1164        self.capabilities.read().clone()
1165    }
1166
1167    /// Get the reported capabilities of the running language server and
1168    /// what we know on the client/adapter-side of its capabilities.
1169    pub fn adapter_server_capabilities(&self) -> AdapterServerCapabilities {
1170        AdapterServerCapabilities {
1171            server_capabilities: self.capabilities(),
1172            code_action_kinds: self.code_action_kinds(),
1173        }
1174    }
1175
1176    pub fn update_capabilities(&self, update: impl FnOnce(&mut ServerCapabilities)) {
1177        update(self.capabilities.write().deref_mut());
1178    }
1179
1180    pub fn configuration(&self) -> &Value {
1181        &self.configuration.settings
1182    }
1183
1184    /// Get the id of the running language server.
1185    pub fn server_id(&self) -> LanguageServerId {
1186        self.server_id
1187    }
1188
1189    /// Language server's binary information.
1190    pub fn binary(&self) -> &LanguageServerBinary {
1191        &self.binary
1192    }
1193
1194    /// Sends a RPC request to the language server.
1195    ///
1196    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1197    pub fn request<T: request::Request>(
1198        &self,
1199        params: T::Params,
1200    ) -> impl LspRequestFuture<T::Result> + use<T>
1201    where
1202        T::Result: 'static + Send,
1203    {
1204        Self::request_internal::<T>(
1205            &self.next_id,
1206            &self.response_handlers,
1207            &self.outbound_tx,
1208            &self.notification_tx,
1209            &self.executor,
1210            params,
1211        )
1212    }
1213
1214    /// Sends a RPC request to the language server, with a custom timer, a future which when becoming
1215    /// ready causes the request to be timed out with the future's output message.
1216    ///
1217    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#requestMessage)
1218    pub fn request_with_timer<T: request::Request, U: Future<Output = String>>(
1219        &self,
1220        params: T::Params,
1221        timer: U,
1222    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1223    where
1224        T::Result: 'static + Send,
1225    {
1226        Self::request_internal_with_timer::<T, U>(
1227            &self.next_id,
1228            &self.response_handlers,
1229            &self.outbound_tx,
1230            &self.notification_tx,
1231            &self.executor,
1232            timer,
1233            params,
1234        )
1235    }
1236
1237    fn request_internal_with_timer<T, U>(
1238        next_id: &AtomicI32,
1239        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
1240        outbound_tx: &channel::Sender<String>,
1241        notification_serializers: &channel::Sender<NotificationSerializer>,
1242        executor: &BackgroundExecutor,
1243        timer: U,
1244        params: T::Params,
1245    ) -> impl LspRequestFuture<T::Result> + use<T, U>
1246    where
1247        T::Result: 'static + Send,
1248        T: request::Request,
1249        U: Future<Output = String>,
1250    {
1251        let id = next_id.fetch_add(1, SeqCst);
1252        let message = serde_json::to_string(&Request {
1253            jsonrpc: JSON_RPC_VERSION,
1254            id: RequestId::Int(id),
1255            method: T::METHOD,
1256            params,
1257        })
1258        .unwrap();
1259
1260        let (tx, rx) = oneshot::channel();
1261        let handle_response = response_handlers
1262            .lock()
1263            .as_mut()
1264            .context("server shut down")
1265            .map(|handlers| {
1266                let executor = executor.clone();
1267                handlers.insert(
1268                    RequestId::Int(id),
1269                    Box::new(move |result| {
1270                        executor
1271                            .spawn(async move {
1272                                let response = match result {
1273                                    Ok(response) => match serde_json::from_str(&response) {
1274                                        Ok(deserialized) => Ok(deserialized),
1275                                        Err(error) => {
1276                                            log::error!("failed to deserialize response from language server: {}. response from language server: {:?}", error, response);
1277                                            Err(error).context("failed to deserialize response")
1278                                        }
1279                                    }
1280                                    Err(error) => Err(anyhow!("{}", error.message)),
1281                                };
1282                                _ = tx.send(response);
1283                            })
1284                            .detach();
1285                    }),
1286                );
1287            });
1288
1289        let send = outbound_tx
1290            .try_send(message)
1291            .context("failed to write to language server's stdin");
1292
1293        let notification_serializers = notification_serializers.downgrade();
1294        let started = Instant::now();
1295        LspRequest::new(id, async move {
1296            if let Err(e) = handle_response {
1297                return ConnectionResult::Result(Err(e));
1298            }
1299            if let Err(e) = send {
1300                return ConnectionResult::Result(Err(e));
1301            }
1302
1303            let cancel_on_drop = util::defer(move || {
1304                if let Some(notification_serializers) = notification_serializers.upgrade() {
1305                    Self::notify_internal::<notification::Cancel>(
1306                        &notification_serializers,
1307                        CancelParams {
1308                            id: NumberOrString::Number(id),
1309                        },
1310                    )
1311                    .ok();
1312                }
1313            });
1314
1315            let method = T::METHOD;
1316            select! {
1317                response = rx.fuse() => {
1318                    let elapsed = started.elapsed();
1319                    log::trace!("Took {elapsed:?} to receive response to {method:?} id {id}");
1320                    cancel_on_drop.abort();
1321                    match response {
1322                        Ok(response_result) => ConnectionResult::Result(response_result),
1323                        Err(Canceled) => {
1324                            log::error!("Server reset connection for a request {method:?} id {id}");
1325                            ConnectionResult::ConnectionReset
1326                        },
1327                    }
1328                }
1329
1330                message = timer.fuse() => {
1331                    log::error!("Cancelled LSP request task for {method:?} id {id} {message}");
1332                    ConnectionResult::Timeout
1333                }
1334            }
1335        })
1336    }
1337
1338    fn request_internal<T>(
1339        next_id: &AtomicI32,
1340        response_handlers: &Mutex<Option<HashMap<RequestId, ResponseHandler>>>,
1341        outbound_tx: &channel::Sender<String>,
1342        notification_serializers: &channel::Sender<NotificationSerializer>,
1343        executor: &BackgroundExecutor,
1344        params: T::Params,
1345    ) -> impl LspRequestFuture<T::Result> + use<T>
1346    where
1347        T::Result: 'static + Send,
1348        T: request::Request,
1349    {
1350        Self::request_internal_with_timer::<T, _>(
1351            next_id,
1352            response_handlers,
1353            outbound_tx,
1354            notification_serializers,
1355            executor,
1356            Self::default_request_timer(executor.clone()),
1357            params,
1358        )
1359    }
1360
1361    pub fn default_request_timer(executor: BackgroundExecutor) -> impl Future<Output = String> {
1362        executor
1363            .timer(LSP_REQUEST_TIMEOUT)
1364            .map(|_| format!("which took over {LSP_REQUEST_TIMEOUT:?}"))
1365    }
1366
1367    /// Sends a RPC notification to the language server.
1368    ///
1369    /// [LSP Specification](https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#notificationMessage)
1370    pub fn notify<T: notification::Notification>(&self, params: T::Params) -> Result<()> {
1371        let outbound = self.notification_tx.clone();
1372        Self::notify_internal::<T>(&outbound, params)
1373    }
1374
1375    fn notify_internal<T: notification::Notification>(
1376        outbound_tx: &channel::Sender<NotificationSerializer>,
1377        params: T::Params,
1378    ) -> Result<()> {
1379        let serializer = NotificationSerializer(Box::new(move || {
1380            serde_json::to_string(&Notification {
1381                jsonrpc: JSON_RPC_VERSION,
1382                method: T::METHOD,
1383                params,
1384            })
1385            .unwrap()
1386        }));
1387
1388        outbound_tx.send_blocking(serializer)?;
1389        Ok(())
1390    }
1391
1392    /// Add new workspace folder to the list.
1393    pub fn add_workspace_folder(&self, uri: Uri) {
1394        if self
1395            .capabilities()
1396            .workspace
1397            .and_then(|ws| {
1398                ws.workspace_folders.and_then(|folders| {
1399                    folders
1400                        .change_notifications
1401                        .map(|caps| matches!(caps, OneOf::Left(false)))
1402                })
1403            })
1404            .unwrap_or(true)
1405        {
1406            return;
1407        }
1408
1409        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1410            return;
1411        };
1412        let is_new_folder = workspace_folders.lock().insert(uri.clone());
1413        if is_new_folder {
1414            let params = DidChangeWorkspaceFoldersParams {
1415                event: WorkspaceFoldersChangeEvent {
1416                    added: vec![WorkspaceFolder {
1417                        uri,
1418                        name: String::default(),
1419                    }],
1420                    removed: vec![],
1421                },
1422            };
1423            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1424        }
1425    }
1426
1427    /// Remove existing workspace folder from the list.
1428    pub fn remove_workspace_folder(&self, uri: Uri) {
1429        if self
1430            .capabilities()
1431            .workspace
1432            .and_then(|ws| {
1433                ws.workspace_folders.and_then(|folders| {
1434                    folders
1435                        .change_notifications
1436                        .map(|caps| !matches!(caps, OneOf::Left(false)))
1437                })
1438            })
1439            .unwrap_or(true)
1440        {
1441            return;
1442        }
1443        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1444            return;
1445        };
1446        let was_removed = workspace_folders.lock().remove(&uri);
1447        if was_removed {
1448            let params = DidChangeWorkspaceFoldersParams {
1449                event: WorkspaceFoldersChangeEvent {
1450                    added: vec![],
1451                    removed: vec![WorkspaceFolder {
1452                        uri,
1453                        name: String::default(),
1454                    }],
1455                },
1456            };
1457            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1458        }
1459    }
1460    pub fn set_workspace_folders(&self, folders: BTreeSet<Uri>) {
1461        let Some(workspace_folders) = self.workspace_folders.as_ref() else {
1462            return;
1463        };
1464        let mut workspace_folders = workspace_folders.lock();
1465
1466        let old_workspace_folders = std::mem::take(&mut *workspace_folders);
1467        let added: Vec<_> = folders
1468            .difference(&old_workspace_folders)
1469            .map(|uri| WorkspaceFolder {
1470                uri: uri.clone(),
1471                name: String::default(),
1472            })
1473            .collect();
1474
1475        let removed: Vec<_> = old_workspace_folders
1476            .difference(&folders)
1477            .map(|uri| WorkspaceFolder {
1478                uri: uri.clone(),
1479                name: String::default(),
1480            })
1481            .collect();
1482        *workspace_folders = folders;
1483        let should_notify = !added.is_empty() || !removed.is_empty();
1484        if should_notify {
1485            drop(workspace_folders);
1486            let params = DidChangeWorkspaceFoldersParams {
1487                event: WorkspaceFoldersChangeEvent { added, removed },
1488            };
1489            self.notify::<DidChangeWorkspaceFolders>(params).ok();
1490        }
1491    }
1492
1493    pub fn workspace_folders(&self) -> BTreeSet<Uri> {
1494        self.workspace_folders.as_ref().map_or_else(
1495            || BTreeSet::from_iter([self.root_uri.clone()]),
1496            |folders| folders.lock().clone(),
1497        )
1498    }
1499
1500    pub fn register_buffer(
1501        &self,
1502        uri: Uri,
1503        language_id: String,
1504        version: i32,
1505        initial_text: String,
1506    ) {
1507        self.notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1508            text_document: TextDocumentItem::new(uri, language_id, version, initial_text),
1509        })
1510        .ok();
1511    }
1512
1513    pub fn unregister_buffer(&self, uri: Uri) {
1514        self.notify::<notification::DidCloseTextDocument>(DidCloseTextDocumentParams {
1515            text_document: TextDocumentIdentifier::new(uri),
1516        })
1517        .ok();
1518    }
1519}
1520
1521impl Drop for LanguageServer {
1522    fn drop(&mut self) {
1523        if let Some(shutdown) = self.shutdown() {
1524            self.executor.spawn(shutdown).detach();
1525        }
1526    }
1527}
1528
1529impl Subscription {
1530    /// Detaching a subscription handle prevents it from unsubscribing on drop.
1531    pub fn detach(&mut self) {
1532        match self {
1533            Subscription::Notification {
1534                notification_handlers,
1535                ..
1536            } => *notification_handlers = None,
1537            Subscription::Io { io_handlers, .. } => *io_handlers = None,
1538        }
1539    }
1540}
1541
1542impl fmt::Display for LanguageServerId {
1543    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1544        self.0.fmt(f)
1545    }
1546}
1547
1548impl fmt::Debug for LanguageServer {
1549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1550        f.debug_struct("LanguageServer")
1551            .field("id", &self.server_id.0)
1552            .field("name", &self.name)
1553            .finish_non_exhaustive()
1554    }
1555}
1556
1557impl fmt::Debug for LanguageServerBinary {
1558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1559        let mut debug = f.debug_struct("LanguageServerBinary");
1560        debug.field("path", &self.path);
1561        debug.field("arguments", &self.arguments);
1562
1563        if let Some(env) = &self.env {
1564            let redacted_env: BTreeMap<String, String> = env
1565                .iter()
1566                .map(|(key, value)| {
1567                    let redacted_value = if redact::should_redact(key) {
1568                        "REDACTED".to_string()
1569                    } else {
1570                        value.clone()
1571                    };
1572                    (key.clone(), redacted_value)
1573                })
1574                .collect();
1575            debug.field("env", &Some(redacted_env));
1576        } else {
1577            debug.field("env", &self.env);
1578        }
1579
1580        debug.finish()
1581    }
1582}
1583
1584impl Drop for Subscription {
1585    fn drop(&mut self) {
1586        match self {
1587            Subscription::Notification {
1588                method,
1589                notification_handlers,
1590            } => {
1591                if let Some(handlers) = notification_handlers {
1592                    handlers.lock().remove(method);
1593                }
1594            }
1595            Subscription::Io { id, io_handlers } => {
1596                if let Some(io_handlers) = io_handlers.as_ref().and_then(|h| h.upgrade()) {
1597                    io_handlers.lock().remove(id);
1598                }
1599            }
1600        }
1601    }
1602}
1603
1604/// Mock language server for use in tests.
1605#[cfg(any(test, feature = "test-support"))]
1606#[derive(Clone)]
1607pub struct FakeLanguageServer {
1608    pub binary: LanguageServerBinary,
1609    pub server: Arc<LanguageServer>,
1610    notifications_rx: channel::Receiver<(String, String)>,
1611}
1612
1613#[cfg(any(test, feature = "test-support"))]
1614impl FakeLanguageServer {
1615    /// Construct a fake language server.
1616    pub fn new(
1617        server_id: LanguageServerId,
1618        binary: LanguageServerBinary,
1619        name: String,
1620        capabilities: ServerCapabilities,
1621        cx: &mut AsyncApp,
1622    ) -> (LanguageServer, FakeLanguageServer) {
1623        let (stdin_writer, stdin_reader) = async_pipe::pipe();
1624        let (stdout_writer, stdout_reader) = async_pipe::pipe();
1625        let (notifications_tx, notifications_rx) = channel::unbounded();
1626
1627        let server_name = LanguageServerName(name.clone().into());
1628        let process_name = Arc::from(name.as_str());
1629        let root = Self::root_path();
1630        let workspace_folders: Arc<Mutex<BTreeSet<Uri>>> = Default::default();
1631        let mut server = LanguageServer::new_internal(
1632            server_id,
1633            server_name.clone(),
1634            stdin_writer,
1635            stdout_reader,
1636            None::<async_pipe::PipeReader>,
1637            Arc::new(Mutex::new(None)),
1638            None,
1639            None,
1640            binary.clone(),
1641            root,
1642            Some(workspace_folders.clone()),
1643            cx,
1644            |_| false,
1645        );
1646        server.process_name = process_name;
1647        let fake = FakeLanguageServer {
1648            binary: binary.clone(),
1649            server: Arc::new({
1650                let mut server = LanguageServer::new_internal(
1651                    server_id,
1652                    server_name,
1653                    stdout_writer,
1654                    stdin_reader,
1655                    None::<async_pipe::PipeReader>,
1656                    Arc::new(Mutex::new(None)),
1657                    None,
1658                    None,
1659                    binary,
1660                    Self::root_path(),
1661                    Some(workspace_folders),
1662                    cx,
1663                    move |msg| {
1664                        notifications_tx
1665                            .try_send((
1666                                msg.method.to_string(),
1667                                msg.params.as_ref().unwrap_or(&Value::Null).to_string(),
1668                            ))
1669                            .ok();
1670                        true
1671                    },
1672                );
1673                server.process_name = name.as_str().into();
1674                server
1675            }),
1676            notifications_rx,
1677        };
1678        fake.set_request_handler::<request::Initialize, _, _>({
1679            let capabilities = capabilities;
1680            move |_, _| {
1681                let capabilities = capabilities.clone();
1682                let name = name.clone();
1683                async move {
1684                    Ok(InitializeResult {
1685                        capabilities,
1686                        server_info: Some(ServerInfo {
1687                            name,
1688                            ..Default::default()
1689                        }),
1690                    })
1691                }
1692            }
1693        });
1694
1695        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1696
1697        (server, fake)
1698    }
1699    #[cfg(target_os = "windows")]
1700    fn root_path() -> Uri {
1701        Uri::from_file_path("C:/").unwrap()
1702    }
1703
1704    #[cfg(not(target_os = "windows"))]
1705    fn root_path() -> Uri {
1706        Uri::from_file_path("/").unwrap()
1707    }
1708}
1709
1710#[cfg(any(test, feature = "test-support"))]
1711impl LanguageServer {
1712    pub fn full_capabilities() -> ServerCapabilities {
1713        ServerCapabilities {
1714            document_highlight_provider: Some(OneOf::Left(true)),
1715            code_action_provider: Some(CodeActionProviderCapability::Simple(true)),
1716            document_formatting_provider: Some(OneOf::Left(true)),
1717            document_range_formatting_provider: Some(OneOf::Left(true)),
1718            definition_provider: Some(OneOf::Left(true)),
1719            workspace_symbol_provider: Some(OneOf::Left(true)),
1720            implementation_provider: Some(ImplementationProviderCapability::Simple(true)),
1721            type_definition_provider: Some(TypeDefinitionProviderCapability::Simple(true)),
1722            ..ServerCapabilities::default()
1723        }
1724    }
1725}
1726
1727#[cfg(any(test, feature = "test-support"))]
1728impl FakeLanguageServer {
1729    /// See [`LanguageServer::notify`].
1730    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
1731        self.server.notify::<T>(params).ok();
1732    }
1733
1734    /// See [`LanguageServer::request`].
1735    pub async fn request<T>(&self, params: T::Params) -> ConnectionResult<T::Result>
1736    where
1737        T: request::Request,
1738        T::Result: 'static + Send,
1739    {
1740        self.server.executor.start_waiting();
1741        self.server.request::<T>(params).await
1742    }
1743
1744    /// Attempts [`Self::try_receive_notification`], unwrapping if it has not received the specified type yet.
1745    pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
1746        self.server.executor.start_waiting();
1747        self.try_receive_notification::<T>().await.unwrap()
1748    }
1749
1750    /// Consumes the notification channel until it finds a notification for the specified type.
1751    pub async fn try_receive_notification<T: notification::Notification>(
1752        &mut self,
1753    ) -> Option<T::Params> {
1754        loop {
1755            let (method, params) = self.notifications_rx.recv().await.ok()?;
1756            if method == T::METHOD {
1757                return Some(serde_json::from_str::<T::Params>(&params).unwrap());
1758            } else {
1759                log::info!("skipping message in fake language server {:?}", params);
1760            }
1761        }
1762    }
1763
1764    /// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
1765    pub fn set_request_handler<T, F, Fut>(
1766        &self,
1767        mut handler: F,
1768    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1769    where
1770        T: 'static + request::Request,
1771        T::Params: 'static + Send,
1772        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp) -> Fut,
1773        Fut: 'static + Future<Output = Result<T::Result>>,
1774    {
1775        let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
1776        self.server.remove_request_handler::<T>();
1777        self.server
1778            .on_request::<T, _, _>(move |params, cx| {
1779                let result = handler(params, cx.clone());
1780                let responded_tx = responded_tx.clone();
1781                let executor = cx.background_executor().clone();
1782                async move {
1783                    executor.simulate_random_delay().await;
1784                    let result = result.await;
1785                    responded_tx.unbounded_send(()).ok();
1786                    result
1787                }
1788            })
1789            .detach();
1790        responded_rx
1791    }
1792
1793    /// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
1794    pub fn handle_notification<T, F>(
1795        &self,
1796        mut handler: F,
1797    ) -> futures::channel::mpsc::UnboundedReceiver<()>
1798    where
1799        T: 'static + notification::Notification,
1800        T::Params: 'static + Send,
1801        F: 'static + Send + FnMut(T::Params, gpui::AsyncApp),
1802    {
1803        let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
1804        self.server.remove_notification_handler::<T>();
1805        self.server
1806            .on_notification::<T, _>(move |params, cx| {
1807                handler(params, cx.clone());
1808                handled_tx.unbounded_send(()).ok();
1809            })
1810            .detach();
1811        handled_rx
1812    }
1813
1814    /// Removes any existing handler for specified notification type.
1815    pub fn remove_request_handler<T>(&mut self)
1816    where
1817        T: 'static + request::Request,
1818    {
1819        self.server.remove_request_handler::<T>();
1820    }
1821
1822    /// Simulate that the server has started work and notifies about its progress with the specified token.
1823    pub async fn start_progress(&self, token: impl Into<String>) {
1824        self.start_progress_with(token, Default::default()).await
1825    }
1826
1827    pub async fn start_progress_with(
1828        &self,
1829        token: impl Into<String>,
1830        progress: WorkDoneProgressBegin,
1831    ) {
1832        let token = token.into();
1833        self.request::<request::WorkDoneProgressCreate>(WorkDoneProgressCreateParams {
1834            token: NumberOrString::String(token.clone()),
1835        })
1836        .await
1837        .into_response()
1838        .unwrap();
1839        self.notify::<notification::Progress>(ProgressParams {
1840            token: NumberOrString::String(token),
1841            value: ProgressParamsValue::WorkDone(WorkDoneProgress::Begin(progress)),
1842        });
1843    }
1844
1845    /// Simulate that the server has completed work and notifies about that with the specified token.
1846    pub fn end_progress(&self, token: impl Into<String>) {
1847        self.notify::<notification::Progress>(ProgressParams {
1848            token: NumberOrString::String(token.into()),
1849            value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
1850        });
1851    }
1852}
1853
1854#[cfg(test)]
1855mod tests {
1856    use super::*;
1857    use gpui::{SemanticVersion, TestAppContext};
1858    use std::str::FromStr;
1859
1860    #[ctor::ctor]
1861    fn init_logger() {
1862        zlog::init_test();
1863    }
1864
1865    #[gpui::test]
1866    async fn test_fake(cx: &mut TestAppContext) {
1867        cx.update(|cx| {
1868            release_channel::init(SemanticVersion::default(), cx);
1869        });
1870        let (server, mut fake) = FakeLanguageServer::new(
1871            LanguageServerId(0),
1872            LanguageServerBinary {
1873                path: "path/to/language-server".into(),
1874                arguments: vec![],
1875                env: None,
1876            },
1877            "the-lsp".to_string(),
1878            Default::default(),
1879            &mut cx.to_async(),
1880        );
1881
1882        let (message_tx, message_rx) = channel::unbounded();
1883        let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
1884        server
1885            .on_notification::<notification::ShowMessage, _>(move |params, _| {
1886                message_tx.try_send(params).unwrap()
1887            })
1888            .detach();
1889        server
1890            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
1891                diagnostics_tx.try_send(params).unwrap()
1892            })
1893            .detach();
1894
1895        let server = cx
1896            .update(|cx| {
1897                let params = server.default_initialize_params(false, cx);
1898                let configuration = DidChangeConfigurationParams {
1899                    settings: Default::default(),
1900                };
1901                server.initialize(params, configuration.into(), cx)
1902            })
1903            .await
1904            .unwrap();
1905        server
1906            .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
1907                text_document: TextDocumentItem::new(
1908                    Uri::from_str("file://a/b").unwrap(),
1909                    "rust".to_string(),
1910                    0,
1911                    "".to_string(),
1912                ),
1913            })
1914            .unwrap();
1915        assert_eq!(
1916            fake.receive_notification::<notification::DidOpenTextDocument>()
1917                .await
1918                .text_document
1919                .uri
1920                .as_str(),
1921            "file://a/b"
1922        );
1923
1924        fake.notify::<notification::ShowMessage>(ShowMessageParams {
1925            typ: MessageType::ERROR,
1926            message: "ok".to_string(),
1927        });
1928        fake.notify::<notification::PublishDiagnostics>(PublishDiagnosticsParams {
1929            uri: Uri::from_str("file://b/c").unwrap(),
1930            version: Some(5),
1931            diagnostics: vec![],
1932        });
1933        assert_eq!(message_rx.recv().await.unwrap().message, "ok");
1934        assert_eq!(
1935            diagnostics_rx.recv().await.unwrap().uri.as_str(),
1936            "file://b/c"
1937        );
1938
1939        fake.set_request_handler::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
1940
1941        drop(server);
1942        cx.run_until_parked();
1943        fake.receive_notification::<notification::Exit>().await;
1944    }
1945
1946    #[gpui::test]
1947    fn test_deserialize_string_digit_id() {
1948        let json = r#"{"jsonrpc":"2.0","id":"2","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1949        let notification = serde_json::from_str::<NotificationOrRequest>(json)
1950            .expect("message with string id should be parsed");
1951        let expected_id = RequestId::Str("2".to_string());
1952        assert_eq!(notification.id, Some(expected_id));
1953    }
1954
1955    #[gpui::test]
1956    fn test_deserialize_string_id() {
1957        let json = r#"{"jsonrpc":"2.0","id":"anythingAtAll","method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1958        let notification = serde_json::from_str::<NotificationOrRequest>(json)
1959            .expect("message with string id should be parsed");
1960        let expected_id = RequestId::Str("anythingAtAll".to_string());
1961        assert_eq!(notification.id, Some(expected_id));
1962    }
1963
1964    #[gpui::test]
1965    fn test_deserialize_int_id() {
1966        let json = r#"{"jsonrpc":"2.0","id":2,"method":"workspace/configuration","params":{"items":[{"scopeUri":"file:///Users/mph/Devel/personal/hello-scala/","section":"metals"}]}}"#;
1967        let notification = serde_json::from_str::<NotificationOrRequest>(json)
1968            .expect("message with string id should be parsed");
1969        let expected_id = RequestId::Int(2);
1970        assert_eq!(notification.id, Some(expected_id));
1971    }
1972
1973    #[test]
1974    fn test_serialize_has_no_nulls() {
1975        // Ensure we're not setting both result and error variants. (ticket #10595)
1976        let no_tag = Response::<u32> {
1977            jsonrpc: "",
1978            id: RequestId::Int(0),
1979            value: LspResult::Ok(None),
1980        };
1981        assert_eq!(
1982            serde_json::to_string(&no_tag).unwrap(),
1983            "{\"jsonrpc\":\"\",\"id\":0,\"result\":null}"
1984        );
1985        let no_tag = Response::<u32> {
1986            jsonrpc: "",
1987            id: RequestId::Int(0),
1988            value: LspResult::Error(None),
1989        };
1990        assert_eq!(
1991            serde_json::to_string(&no_tag).unwrap(),
1992            "{\"jsonrpc\":\"\",\"id\":0,\"error\":null}"
1993        );
1994    }
1995}