Log language server stderr output in server logs (#2918)

Kirill Bulatov created

<img width="1728" alt="Screenshot 2023-08-31 at 01 07 11"
src="https://github.com/zed-industries/zed/assets/2690773/537a18d6-59bf-4a77-896f-fc2cb6dc7fe8">

Line by line, we print stderr to help with debugging and servers that
log into stderr.

Release Notes:

- N/A

Change summary

crates/language_tools/src/lsp_log.rs | 74 ++++++++++++++++----------
crates/lsp/src/lsp.rs                | 83 +++++++++++++++++++++++------
crates/zed/src/languages/lua.rs      | 13 +---
3 files changed, 113 insertions(+), 57 deletions(-)

Detailed changes

crates/language_tools/src/lsp_log.rs 🔗

@@ -12,6 +12,7 @@ use gpui::{
     ViewHandle, WeakModelHandle,
 };
 use language::{Buffer, LanguageServerId, LanguageServerName};
+use lsp::IoKind;
 use project::{Project, Worktree};
 use std::{borrow::Cow, sync::Arc};
 use theme::{ui, Theme};
@@ -26,7 +27,7 @@ const RECEIVE_LINE: &str = "// Receive:\n";
 
 pub struct LogStore {
     projects: HashMap<WeakModelHandle<Project>, ProjectState>,
-    io_tx: mpsc::UnboundedSender<(WeakModelHandle<Project>, LanguageServerId, bool, String)>,
+    io_tx: mpsc::UnboundedSender<(WeakModelHandle<Project>, LanguageServerId, IoKind, String)>,
 }
 
 struct ProjectState {
@@ -37,12 +38,12 @@ struct ProjectState {
 struct LanguageServerState {
     log_buffer: ModelHandle<Buffer>,
     rpc_state: Option<LanguageServerRpcState>,
+    _subscription: Option<lsp::Subscription>,
 }
 
 struct LanguageServerRpcState {
     buffer: ModelHandle<Buffer>,
     last_message_kind: Option<MessageKind>,
-    _subscription: lsp::Subscription,
 }
 
 pub struct LspLogView {
@@ -118,11 +119,11 @@ impl LogStore {
             io_tx,
         };
         cx.spawn_weak(|this, mut cx| async move {
-            while let Some((project, server_id, is_output, mut message)) = io_rx.next().await {
+            while let Some((project, server_id, io_kind, mut message)) = io_rx.next().await {
                 if let Some(this) = this.upgrade(&cx) {
                     this.update(&mut cx, |this, cx| {
                         message.push('\n');
-                        this.on_io(project, server_id, is_output, &message, cx);
+                        this.on_io(project, server_id, io_kind, &message, cx);
                     });
                 }
             }
@@ -168,22 +169,29 @@ impl LogStore {
         cx: &mut ModelContext<Self>,
     ) -> Option<ModelHandle<Buffer>> {
         let project_state = self.projects.get_mut(&project.downgrade())?;
-        Some(
-            project_state
-                .servers
-                .entry(id)
-                .or_insert_with(|| {
-                    cx.notify();
-                    LanguageServerState {
-                        rpc_state: None,
-                        log_buffer: cx
-                            .add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""))
-                            .clone(),
-                    }
-                })
-                .log_buffer
-                .clone(),
-        )
+        let server_state = project_state.servers.entry(id).or_insert_with(|| {
+            cx.notify();
+            LanguageServerState {
+                rpc_state: None,
+                log_buffer: cx
+                    .add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""))
+                    .clone(),
+                _subscription: None,
+            }
+        });
+
+        let server = project.read(cx).language_server_for_id(id);
+        let weak_project = project.downgrade();
+        let io_tx = self.io_tx.clone();
+        server_state._subscription = server.map(|server| {
+            server.on_io(move |io_kind, message| {
+                io_tx
+                    .unbounded_send((weak_project, id, io_kind, message.to_string()))
+                    .ok();
+            })
+        });
+
+        Some(server_state.log_buffer.clone())
     }
 
     fn add_language_server_log(
@@ -230,7 +238,7 @@ impl LogStore {
         Some(server_state.log_buffer.clone())
     }
 
-    pub fn enable_rpc_trace_for_language_server(
+    fn enable_rpc_trace_for_language_server(
         &mut self,
         project: &ModelHandle<Project>,
         server_id: LanguageServerId,
@@ -239,9 +247,7 @@ impl LogStore {
         let weak_project = project.downgrade();
         let project_state = self.projects.get_mut(&weak_project)?;
         let server_state = project_state.servers.get_mut(&server_id)?;
-        let server = project.read(cx).language_server_for_id(server_id)?;
         let rpc_state = server_state.rpc_state.get_or_insert_with(|| {
-            let io_tx = self.io_tx.clone();
             let language = project.read(cx).languages().language_for_name("JSON");
             let buffer = cx.add_model(|cx| Buffer::new(0, cx.model_id() as u64, ""));
             cx.spawn_weak({
@@ -258,11 +264,6 @@ impl LogStore {
             LanguageServerRpcState {
                 buffer,
                 last_message_kind: None,
-                _subscription: server.on_io(move |is_received, json| {
-                    io_tx
-                        .unbounded_send((weak_project, server_id, is_received, json.to_string()))
-                        .ok();
-                }),
             }
         });
         Some(rpc_state.buffer.clone())
@@ -285,10 +286,25 @@ impl LogStore {
         &mut self,
         project: WeakModelHandle<Project>,
         language_server_id: LanguageServerId,
-        is_received: bool,
+        io_kind: IoKind,
         message: &str,
         cx: &mut AppContext,
     ) -> Option<()> {
+        let is_received = match io_kind {
+            IoKind::StdOut => true,
+            IoKind::StdIn => false,
+            IoKind::StdErr => {
+                let project = project.upgrade(cx)?;
+                project.update(cx, |_, cx| {
+                    cx.emit(project::Event::LanguageServerLog(
+                        language_server_id,
+                        format!("stderr: {}\n", message.trim()),
+                    ))
+                });
+                return Some(());
+            }
+        };
+
         let state = self
             .projects
             .get_mut(&project)?

crates/lsp/src/lsp.rs 🔗

@@ -35,7 +35,14 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
 
 type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
-type IoHandler = Box<dyn Send + FnMut(bool, &str)>;
+type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
+
+#[derive(Debug, Clone, Copy)]
+pub enum IoKind {
+    StdOut,
+    StdIn,
+    StdErr,
+}
 
 #[derive(Debug, Clone, Deserialize)]
 pub struct LanguageServerBinary {
@@ -144,16 +151,18 @@ impl LanguageServer {
             .args(binary.arguments)
             .stdin(Stdio::piped())
             .stdout(Stdio::piped())
-            .stderr(Stdio::inherit())
+            .stderr(Stdio::piped())
             .kill_on_drop(true)
             .spawn()?;
 
         let stdin = server.stdin.take().unwrap();
-        let stout = server.stdout.take().unwrap();
+        let stdout = server.stdout.take().unwrap();
+        let stderr = server.stderr.take().unwrap();
         let mut server = Self::new_internal(
             server_id.clone(),
             stdin,
-            stout,
+            stdout,
+            Some(stderr),
             Some(server),
             root_path,
             code_action_kinds,
@@ -181,10 +190,11 @@ impl LanguageServer {
         Ok(server)
     }
 
-    fn new_internal<Stdin, Stdout, F>(
+    fn new_internal<Stdin, Stdout, Stderr, F>(
         server_id: LanguageServerId,
         stdin: Stdin,
         stdout: Stdout,
+        stderr: Option<Stderr>,
         server: Option<Child>,
         root_path: &Path,
         code_action_kinds: Option<Vec<CodeActionKind>>,
@@ -194,7 +204,8 @@ impl LanguageServer {
     where
         Stdin: AsyncWrite + Unpin + Send + 'static,
         Stdout: AsyncRead + Unpin + Send + 'static,
-        F: FnMut(AnyNotification) + 'static + Send,
+        Stderr: AsyncRead + Unpin + Send + 'static,
+        F: FnMut(AnyNotification) + 'static + Send + Clone,
     {
         let (outbound_tx, outbound_rx) = channel::unbounded::<String>();
         let (output_done_tx, output_done_rx) = barrier::channel();
@@ -203,17 +214,27 @@ impl LanguageServer {
         let response_handlers =
             Arc::new(Mutex::new(Some(HashMap::<_, ResponseHandler>::default())));
         let io_handlers = Arc::new(Mutex::new(HashMap::default()));
-        let input_task = cx.spawn(|cx| {
-            Self::handle_input(
-                stdout,
-                on_unhandled_notification,
-                notification_handlers.clone(),
-                response_handlers.clone(),
-                io_handlers.clone(),
-                cx,
-            )
+
+        let stdout_input_task = cx.spawn(|cx| {
+            {
+                Self::handle_input(
+                    stdout,
+                    on_unhandled_notification.clone(),
+                    notification_handlers.clone(),
+                    response_handlers.clone(),
+                    io_handlers.clone(),
+                    cx,
+                )
+            }
             .log_err()
         });
+        let stderr_input_task = stderr
+            .map(|stderr| cx.spawn(|_| Self::handle_stderr(stderr, io_handlers.clone()).log_err()))
+            .unwrap_or_else(|| Task::Ready(Some(None)));
+        let input_task = cx.spawn(|_| async move {
+            let (stdout, stderr) = futures::join!(stdout_input_task, stderr_input_task);
+            stdout.or(stderr)
+        });
         let output_task = cx.background().spawn({
             Self::handle_output(
                 stdin,
@@ -284,7 +305,7 @@ impl LanguageServer {
             if let Ok(message) = str::from_utf8(&buffer) {
                 log::trace!("incoming message:{}", message);
                 for handler in io_handlers.lock().values_mut() {
-                    handler(true, message);
+                    handler(IoKind::StdOut, message);
                 }
             }
 
@@ -327,6 +348,30 @@ impl LanguageServer {
         }
     }
 
+    async fn handle_stderr<Stderr>(
+        stderr: Stderr,
+        io_handlers: Arc<Mutex<HashMap<usize, IoHandler>>>,
+    ) -> anyhow::Result<()>
+    where
+        Stderr: AsyncRead + Unpin + Send + 'static,
+    {
+        let mut stderr = BufReader::new(stderr);
+        let mut buffer = Vec::new();
+        loop {
+            buffer.clear();
+            stderr.read_until(b'\n', &mut buffer).await?;
+            if let Ok(message) = str::from_utf8(&buffer) {
+                log::trace!("incoming stderr message:{message}");
+                for handler in io_handlers.lock().values_mut() {
+                    handler(IoKind::StdErr, message);
+                }
+            }
+
+            // Don't starve the main thread when receiving lots of messages at once.
+            smol::future::yield_now().await;
+        }
+    }
+
     async fn handle_output<Stdin>(
         stdin: Stdin,
         outbound_rx: channel::Receiver<String>,
@@ -348,7 +393,7 @@ impl LanguageServer {
         while let Ok(message) = outbound_rx.recv().await {
             log::trace!("outgoing message:{}", message);
             for handler in io_handlers.lock().values_mut() {
-                handler(false, &message);
+                handler(IoKind::StdIn, &message);
             }
 
             content_len_buffer.clear();
@@ -532,7 +577,7 @@ impl LanguageServer {
     #[must_use]
     pub fn on_io<F>(&self, f: F) -> Subscription
     where
-        F: 'static + Send + FnMut(bool, &str),
+        F: 'static + Send + FnMut(IoKind, &str),
     {
         let id = self.next_id.fetch_add(1, SeqCst);
         self.io_handlers.lock().insert(id, Box::new(f));
@@ -851,6 +896,7 @@ impl LanguageServer {
             LanguageServerId(0),
             stdin_writer,
             stdout_reader,
+            None::<async_pipe::PipeReader>,
             None,
             Path::new("/"),
             None,
@@ -862,6 +908,7 @@ impl LanguageServer {
                 LanguageServerId(0),
                 stdout_writer,
                 stdin_reader,
+                None::<async_pipe::PipeReader>,
                 None,
                 Path::new("/"),
                 None,

crates/zed/src/languages/lua.rs 🔗

@@ -6,7 +6,7 @@ use futures::{io::BufReader, StreamExt};
 use language::{LanguageServerName, LspAdapterDelegate};
 use lsp::LanguageServerBinary;
 use smol::fs;
-use std::{any::Any, env::consts, ffi::OsString, path::PathBuf};
+use std::{any::Any, env::consts, path::PathBuf};
 use util::{
     async_iife,
     github::{latest_github_release, GitHubLspBinaryVersion},
@@ -16,13 +16,6 @@ use util::{
 #[derive(Copy, Clone)]
 pub struct LuaLspAdapter;
 
-fn server_binary_arguments() -> Vec<OsString> {
-    vec![
-        "--logpath=~/lua-language-server.log".into(),
-        "--loglevel=trace".into(),
-    ]
-}
-
 #[async_trait]
 impl super::LspAdapter for LuaLspAdapter {
     async fn name(&self) -> LanguageServerName {
@@ -83,7 +76,7 @@ impl super::LspAdapter for LuaLspAdapter {
         .await?;
         Ok(LanguageServerBinary {
             path: binary_path,
-            arguments: server_binary_arguments(),
+            arguments: Vec::new(),
         })
     }
 
@@ -127,7 +120,7 @@ async fn get_cached_server_binary(container_dir: PathBuf) -> Option<LanguageServ
         if let Some(path) = last_binary_path {
             Ok(LanguageServerBinary {
                 path,
-                arguments: server_binary_arguments(),
+                arguments: Vec::new(),
             })
         } else {
             Err(anyhow!("no cached binary"))