Let fake and real LanguageServer access AsyncAppContext in handler callbacks

Max Brunsfeld created

Also, reimplement FakeLanguageServer by wrapping LanguageServer, instead of
duplicating its functionality differently.

Change summary

crates/editor/src/editor.rs     |  14 
crates/language/src/language.rs |  18 
crates/lsp/src/lsp.rs           | 496 +++++++++++++---------------------
crates/project/src/project.rs   |  82 ++--
crates/server/src/rpc.rs        | 118 ++++---
5 files changed, 313 insertions(+), 415 deletions(-)

Detailed changes

crates/editor/src/editor.rs 🔗

@@ -8928,7 +8928,7 @@ mod tests {
             .unwrap();
 
         cx.foreground().start_waiting();
-        let mut fake_server = fake_servers.next().await.unwrap();
+        let fake_server = fake_servers.next().await.unwrap();
 
         let buffer = cx.add_model(|cx| MultiBuffer::singleton(buffer, cx));
         let (_, editor) = cx.add_window(|cx| build_editor(buffer, cx));
@@ -8942,10 +8942,10 @@ mod tests {
                     params.text_document.uri,
                     lsp::Url::from_file_path("/file.rs").unwrap()
                 );
-                Some(vec![lsp::TextEdit::new(
+                Ok(Some(vec![lsp::TextEdit::new(
                     lsp::Range::new(lsp::Position::new(0, 3), lsp::Position::new(1, 0)),
                     ", ".to_string(),
-                )])
+                )]))
             })
             .next()
             .await;
@@ -9173,7 +9173,7 @@ mod tests {
                         params.text_document_position.position,
                         lsp::Position::new(position.row, position.column)
                     );
-                    Some(lsp::CompletionResponse::Array(
+                    Ok(Some(lsp::CompletionResponse::Array(
                         completions
                             .iter()
                             .map(|(range, new_text)| lsp::CompletionItem {
@@ -9188,7 +9188,7 @@ mod tests {
                                 ..Default::default()
                             })
                             .collect(),
-                    ))
+                    )))
                 }
             })
             .next()
@@ -9202,7 +9202,7 @@ mod tests {
             fake.handle_request::<lsp::request::ResolveCompletionItem, _, _>(move |_, _| {
                 let edit = edit.clone();
                 async move {
-                    lsp::CompletionItem {
+                    Ok(lsp::CompletionItem {
                         additional_text_edits: edit.map(|(range, new_text)| {
                             vec![lsp::TextEdit::new(
                                 lsp::Range::new(
@@ -9213,7 +9213,7 @@ mod tests {
                             )]
                         }),
                         ..Default::default()
-                    }
+                    })
                 }
             })
             .next()

crates/language/src/language.rs 🔗

@@ -263,14 +263,12 @@ impl LanguageRegistry {
         #[cfg(any(test, feature = "test-support"))]
         if language.fake_adapter.is_some() {
             let language = language.clone();
-            return Some(cx.spawn(|mut cx| async move {
+            return Some(cx.spawn(|cx| async move {
                 let (servers_tx, fake_adapter) = language.fake_adapter.as_ref().unwrap();
-                let (server, mut fake_server) = cx.update(|cx| {
-                    lsp::LanguageServer::fake_with_capabilities(
-                        fake_adapter.capabilities.clone(),
-                        cx,
-                    )
-                });
+                let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities(
+                    fake_adapter.capabilities.clone(),
+                    cx.clone(),
+                );
 
                 if let Some(initializer) = &fake_adapter.initializer {
                     initializer(&mut fake_server);
@@ -297,10 +295,9 @@ impl LanguageRegistry {
 
         let this = self.clone();
         let adapter = language.adapter.clone()?;
-        let background = cx.background().clone();
         let lsp_binary_statuses = self.lsp_binary_statuses_tx.clone();
         let login_shell_env_loaded = self.login_shell_env_loaded.clone();
-        Some(cx.background().spawn(async move {
+        Some(cx.spawn(|cx| async move {
             login_shell_env_loaded.await;
             let server_binary_path = this
                 .lsp_binary_paths
@@ -328,8 +325,7 @@ impl LanguageRegistry {
                 &server_binary_path,
                 server_args,
                 &root_path,
-                adapter.initialization_options(),
-                background,
+                cx,
             )?;
             Ok(server)
         }))

crates/lsp/src/lsp.rs 🔗

@@ -1,15 +1,17 @@
+pub use lsp_types::*;
+
 use anyhow::{anyhow, Context, Result};
 use collections::HashMap;
 use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite};
-use gpui::{executor, Task};
-use parking_lot::{Mutex, RwLock};
+use gpui::{executor, AsyncAppContext, Task};
+use parking_lot::Mutex;
 use postage::{barrier, prelude::Stream};
 use serde::{de::DeserializeOwned, Deserialize, Serialize};
 use serde_json::{json, value::RawValue, Value};
 use smol::{
     channel,
     io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
-    process::Command,
+    process,
 };
 use std::{
     future::Future,
@@ -22,15 +24,12 @@ use std::{
     },
 };
 use std::{path::Path, process::Stdio};
-use util::TryFutureExt;
-
-pub use lsp_types::*;
+use util::{ResultExt, TryFutureExt};
 
 const JSON_RPC_VERSION: &'static str = "2.0";
 const CONTENT_LEN_HEADER: &'static str = "Content-Length: ";
 
-type NotificationHandler =
-    Box<dyn Send + Sync + FnMut(Option<usize>, &str, &mut channel::Sender<Vec<u8>>) -> Result<()>>;
+type NotificationHandler = Box<dyn Send + FnMut(Option<usize>, &str, AsyncAppContext)>;
 type ResponseHandler = Box<dyn Send + FnOnce(Result<&str, Error>)>;
 
 pub struct LanguageServer {
@@ -39,18 +38,17 @@ pub struct LanguageServer {
     outbound_tx: channel::Sender<Vec<u8>>,
     name: String,
     capabilities: ServerCapabilities,
-    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
+    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
     response_handlers: Arc<Mutex<HashMap<usize, ResponseHandler>>>,
     executor: Arc<executor::Background>,
     io_tasks: Mutex<Option<(Task<Option<()>>, Task<Option<()>>)>>,
     output_done_rx: Mutex<Option<barrier::Receiver>>,
     root_path: PathBuf,
-    options: Option<Value>,
 }
 
 pub struct Subscription {
     method: &'static str,
-    notification_handlers: Arc<RwLock<HashMap<&'static str, NotificationHandler>>>,
+    notification_handlers: Arc<Mutex<HashMap<&'static str, NotificationHandler>>>,
 }
 
 #[derive(Serialize, Deserialize)]
@@ -61,18 +59,6 @@ struct Request<'a, T> {
     params: T,
 }
 
-#[cfg(any(test, feature = "test-support"))]
-#[derive(Deserialize)]
-struct AnyRequest<'a> {
-    id: usize,
-    #[serde(borrow)]
-    jsonrpc: &'a str,
-    #[serde(borrow)]
-    method: &'a str,
-    #[serde(borrow)]
-    params: &'a RawValue,
-}
-
 #[derive(Serialize, Deserialize)]
 struct AnyResponse<'a> {
     id: usize,
@@ -85,7 +71,8 @@ struct AnyResponse<'a> {
 #[derive(Serialize)]
 struct Response<T> {
     id: usize,
-    result: T,
+    result: Option<T>,
+    error: Option<Error>,
 }
 
 #[derive(Serialize, Deserialize)]
@@ -118,15 +105,14 @@ impl LanguageServer {
         binary_path: &Path,
         args: &[&str],
         root_path: &Path,
-        options: Option<Value>,
-        background: Arc<executor::Background>,
+        cx: AsyncAppContext,
     ) -> Result<Self> {
         let working_dir = if root_path.is_dir() {
             root_path
         } else {
             root_path.parent().unwrap_or(Path::new("/"))
         };
-        let mut server = Command::new(binary_path)
+        let mut server = process::Command::new(binary_path)
             .current_dir(working_dir)
             .args(args)
             .stdin(Stdio::piped())
@@ -136,95 +122,91 @@ impl LanguageServer {
         let stdin = server.stdin.take().unwrap();
         let stdout = server.stdout.take().unwrap();
         let mut server =
-            Self::new_internal(server_id, stdin, stdout, root_path, options, background);
+            Self::new_internal(server_id, stdin, stdout, root_path, cx, |notification| {
+                log::info!(
+                    "unhandled notification {}:\n{}",
+                    notification.method,
+                    serde_json::to_string_pretty(
+                        &Value::from_str(notification.params.get()).unwrap()
+                    )
+                    .unwrap()
+                );
+            });
         if let Some(name) = binary_path.file_name() {
             server.name = name.to_string_lossy().to_string();
         }
         Ok(server)
     }
 
-    fn new_internal<Stdin, Stdout>(
+    fn new_internal<Stdin, Stdout, F>(
         server_id: usize,
         stdin: Stdin,
         stdout: Stdout,
         root_path: &Path,
-        options: Option<Value>,
-        executor: Arc<executor::Background>,
+        cx: AsyncAppContext,
+        mut on_unhandled_notification: F,
     ) -> Self
     where
         Stdin: AsyncWrite + Unpin + Send + 'static,
         Stdout: AsyncRead + Unpin + Send + 'static,
+        F: FnMut(AnyNotification) + 'static + Send,
     {
         let mut stdin = BufWriter::new(stdin);
         let mut stdout = BufReader::new(stdout);
         let (outbound_tx, outbound_rx) = channel::unbounded::<Vec<u8>>();
         let notification_handlers =
-            Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default()));
+            Arc::new(Mutex::new(HashMap::<_, NotificationHandler>::default()));
         let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default()));
-        let input_task = executor.spawn(
-            {
-                let notification_handlers = notification_handlers.clone();
-                let response_handlers = response_handlers.clone();
-                let mut outbound_tx = outbound_tx.clone();
-                async move {
-                    let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
-                    let mut buffer = Vec::new();
-                    loop {
-                        buffer.clear();
-                        stdout.read_until(b'\n', &mut buffer).await?;
-                        stdout.read_until(b'\n', &mut buffer).await?;
-                        let message_len: usize = std::str::from_utf8(&buffer)?
-                            .strip_prefix(CONTENT_LEN_HEADER)
-                            .ok_or_else(|| anyhow!("invalid header"))?
-                            .trim_end()
-                            .parse()?;
-
-                        buffer.resize(message_len, 0);
-                        stdout.read_exact(&mut buffer).await?;
-                        log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
-
-                        if let Ok(AnyNotification { id, method, params }) =
-                            serde_json::from_slice(&buffer)
-                        {
-                            if let Some(handler) = notification_handlers.write().get_mut(method) {
-                                if let Err(e) = handler(id, params.get(), &mut outbound_tx) {
-                                    log::error!("error handling {} message: {:?}", method, e);
-                                }
+        let input_task = cx.spawn(|cx| {
+            let notification_handlers = notification_handlers.clone();
+            let response_handlers = response_handlers.clone();
+            async move {
+                let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone());
+                let mut buffer = Vec::new();
+                loop {
+                    buffer.clear();
+                    stdout.read_until(b'\n', &mut buffer).await?;
+                    stdout.read_until(b'\n', &mut buffer).await?;
+                    let message_len: usize = std::str::from_utf8(&buffer)?
+                        .strip_prefix(CONTENT_LEN_HEADER)
+                        .ok_or_else(|| anyhow!("invalid header"))?
+                        .trim_end()
+                        .parse()?;
+
+                    buffer.resize(message_len, 0);
+                    stdout.read_exact(&mut buffer).await?;
+                    log::trace!("incoming message:{}", String::from_utf8_lossy(&buffer));
+
+                    if let Ok(msg) = serde_json::from_slice::<AnyNotification>(&buffer) {
+                        if let Some(handler) = notification_handlers.lock().get_mut(msg.method) {
+                            handler(msg.id, msg.params.get(), cx.clone());
+                        } else {
+                            on_unhandled_notification(msg);
+                        }
+                    } else if let Ok(AnyResponse { id, error, result }) =
+                        serde_json::from_slice(&buffer)
+                    {
+                        if let Some(handler) = response_handlers.lock().remove(&id) {
+                            if let Some(error) = error {
+                                handler(Err(error));
+                            } else if let Some(result) = result {
+                                handler(Ok(result.get()));
                             } else {
-                                log::info!(
-                                    "unhandled notification {}:\n{}",
-                                    method,
-                                    serde_json::to_string_pretty(
-                                        &Value::from_str(params.get()).unwrap()
-                                    )
-                                    .unwrap()
-                                );
-                            }
-                        } else if let Ok(AnyResponse { id, error, result }) =
-                            serde_json::from_slice(&buffer)
-                        {
-                            if let Some(handler) = response_handlers.lock().remove(&id) {
-                                if let Some(error) = error {
-                                    handler(Err(error));
-                                } else if let Some(result) = result {
-                                    handler(Ok(result.get()));
-                                } else {
-                                    handler(Ok("null"));
-                                }
+                                handler(Ok("null"));
                             }
-                        } else {
-                            return Err(anyhow!(
-                                "failed to deserialize message:\n{}",
-                                std::str::from_utf8(&buffer)?
-                            ));
                         }
+                    } else {
+                        return Err(anyhow!(
+                            "failed to deserialize message:\n{}",
+                            std::str::from_utf8(&buffer)?
+                        ));
                     }
                 }
             }
-            .log_err(),
-        );
+            .log_err()
+        });
         let (output_done_tx, output_done_rx) = barrier::channel();
-        let output_task = executor.spawn({
+        let output_task = cx.background().spawn({
             let response_handlers = response_handlers.clone();
             async move {
                 let _clear_response_handlers = ClearResponseHandlers(response_handlers);
@@ -253,18 +235,15 @@ impl LanguageServer {
             capabilities: Default::default(),
             next_id: Default::default(),
             outbound_tx,
-            executor: executor.clone(),
+            executor: cx.background().clone(),
             io_tasks: Mutex::new(Some((input_task, output_task))),
             output_done_rx: Mutex::new(Some(output_done_rx)),
             root_path: root_path.to_path_buf(),
-            options,
         }
     }
 
-    pub async fn initialize(mut self) -> Result<Arc<Self>> {
-        let options = self.options.take();
-        let mut this = Arc::new(self);
-        let root_uri = Url::from_file_path(&this.root_path).unwrap();
+    pub async fn initialize(mut self, options: Option<Value>) -> Result<Arc<Self>> {
+        let root_uri = Url::from_file_path(&self.root_path).unwrap();
         #[allow(deprecated)]
         let params = InitializeParams {
             process_id: Default::default(),
@@ -290,12 +269,13 @@ impl LanguageServer {
                                 value_set: vec![
                                     CodeActionKind::REFACTOR.as_str().into(),
                                     CodeActionKind::QUICKFIX.as_str().into(),
+                                    CodeActionKind::SOURCE.as_str().into(),
                                 ],
                             },
                         }),
                         data_support: Some(true),
                         resolve_support: Some(CodeActionCapabilityResolveSupport {
-                            properties: vec!["edit".to_string()],
+                            properties: vec!["edit".to_string(), "command".to_string()],
                         }),
                         ..Default::default()
                     }),
@@ -326,16 +306,14 @@ impl LanguageServer {
             locale: Default::default(),
         };
 
-        let response = this.request::<request::Initialize>(params).await?;
-        {
-            let this = Arc::get_mut(&mut this).unwrap();
-            if let Some(info) = response.server_info {
-                this.name = info.name;
-            }
-            this.capabilities = response.capabilities;
+        let response = self.request::<request::Initialize>(params).await?;
+        if let Some(info) = response.server_info {
+            self.name = info.name;
         }
-        this.notify::<notification::Initialized>(InitializedParams {})?;
-        Ok(this)
+        self.capabilities = response.capabilities;
+
+        self.notify::<notification::Initialized>(InitializedParams {})?;
+        Ok(Arc::new(self))
     }
 
     pub fn shutdown(&self) -> Option<impl 'static + Send + Future<Output = Option<()>>> {
@@ -370,37 +348,42 @@ impl LanguageServer {
         }
     }
 
-    pub fn on_notification<T, F>(&mut self, f: F) -> Subscription
+    #[must_use]
+    pub fn on_notification<T, F>(&self, f: F) -> Subscription
     where
         T: notification::Notification,
-        F: 'static + Send + Sync + FnMut(T::Params),
+        F: 'static + Send + FnMut(T::Params, AsyncAppContext),
     {
         self.on_custom_notification(T::METHOD, f)
     }
 
-    pub fn on_request<T, F>(&mut self, f: F) -> Subscription
+    #[must_use]
+    pub fn on_request<T, F, Fut>(&self, f: F) -> Subscription
     where
         T: request::Request,
-        F: 'static + Send + Sync + FnMut(T::Params) -> Result<T::Result>,
+        T::Params: 'static + Send,
+        F: 'static + Send + FnMut(T::Params, AsyncAppContext) -> Fut,
+        Fut: 'static + Future<Output = Result<T::Result>>,
     {
         self.on_custom_request(T::METHOD, f)
     }
 
-    pub fn on_custom_notification<Params, F>(
-        &mut self,
-        method: &'static str,
-        mut f: F,
-    ) -> Subscription
+    pub fn remove_request_handler<T: request::Request>(&self) {
+        self.notification_handlers.lock().remove(T::METHOD);
+    }
+
+    #[must_use]
+    pub fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
     where
-        F: 'static + Send + Sync + FnMut(Params),
+        F: 'static + Send + FnMut(Params, AsyncAppContext),
         Params: DeserializeOwned,
     {
-        let prev_handler = self.notification_handlers.write().insert(
+        let prev_handler = self.notification_handlers.lock().insert(
             method,
-            Box::new(move |_, params, _| {
-                let params = serde_json::from_str(params)?;
-                f(params);
-                Ok(())
+            Box::new(move |_, params, cx| {
+                if let Some(params) = serde_json::from_str(params).log_err() {
+                    f(params, cx);
+                }
             }),
         );
         assert!(
@@ -413,26 +396,52 @@ impl LanguageServer {
         }
     }
 
-    pub fn on_custom_request<Params, Res, F>(
-        &mut self,
+    #[must_use]
+    pub fn on_custom_request<Params, Res, Fut, F>(
+        &self,
         method: &'static str,
         mut f: F,
     ) -> Subscription
     where
-        F: 'static + Send + Sync + FnMut(Params) -> Result<Res>,
-        Params: DeserializeOwned,
+        F: 'static + Send + FnMut(Params, AsyncAppContext) -> Fut,
+        Fut: 'static + Future<Output = Result<Res>>,
+        Params: DeserializeOwned + Send + 'static,
         Res: Serialize,
     {
-        let prev_handler = self.notification_handlers.write().insert(
+        let outbound_tx = self.outbound_tx.clone();
+        let prev_handler = self.notification_handlers.lock().insert(
             method,
-            Box::new(move |id, params, tx| {
+            Box::new(move |id, params, cx| {
                 if let Some(id) = id {
-                    let params = serde_json::from_str(params)?;
-                    let result = f(params)?;
-                    let response = serde_json::to_vec(&Response { id, result })?;
-                    tx.try_send(response)?;
+                    if let Some(params) = serde_json::from_str(params).log_err() {
+                        let response = f(params, cx.clone());
+                        cx.foreground()
+                            .spawn({
+                                let outbound_tx = outbound_tx.clone();
+                                async move {
+                                    let response = match response.await {
+                                        Ok(result) => Response {
+                                            id,
+                                            result: Some(result),
+                                            error: None,
+                                        },
+                                        Err(error) => Response {
+                                            id,
+                                            result: None,
+                                            error: Some(Error {
+                                                message: error.to_string(),
+                                            }),
+                                        },
+                                    };
+                                    if let Some(response) = serde_json::to_vec(&response).log_err()
+                                    {
+                                        outbound_tx.try_send(response).ok();
+                                    }
+                                }
+                            })
+                            .detach();
+                    }
                 }
-                Ok(())
             }),
         );
         assert!(
@@ -458,7 +467,7 @@ impl LanguageServer {
     }
 
     pub fn request<T: request::Request>(
-        self: &Arc<Self>,
+        &self,
         params: T::Params,
     ) -> impl Future<Output = Result<T::Result>>
     where
@@ -549,36 +558,16 @@ impl Subscription {
 
 impl Drop for Subscription {
     fn drop(&mut self) {
-        self.notification_handlers.write().remove(self.method);
+        self.notification_handlers.lock().remove(self.method);
     }
 }
 
 #[cfg(any(test, feature = "test-support"))]
 pub struct FakeLanguageServer {
-    handlers: FakeLanguageServerHandlers,
-    outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
-    incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
-    _input_task: Task<Result<()>>,
-    _output_task: Task<Result<()>>,
+    server: Arc<LanguageServer>,
+    notifications_rx: channel::Receiver<(String, String)>,
 }
 
-#[cfg(any(test, feature = "test-support"))]
-type FakeLanguageServerHandlers = Arc<
-    Mutex<
-        HashMap<
-            &'static str,
-            Box<
-                dyn Send
-                    + FnMut(
-                        usize,
-                        &[u8],
-                        gpui::AsyncAppContext,
-                    ) -> futures::future::BoxFuture<'static, Vec<u8>>,
-            >,
-        >,
-    >,
->;
-
 #[cfg(any(test, feature = "test-support"))]
 impl LanguageServer {
     pub fn full_capabilities() -> ServerCapabilities {
@@ -591,177 +580,101 @@ impl LanguageServer {
         }
     }
 
-    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Self, FakeLanguageServer) {
+    pub fn fake(cx: AsyncAppContext) -> (Self, FakeLanguageServer) {
         Self::fake_with_capabilities(Self::full_capabilities(), cx)
     }
 
     pub fn fake_with_capabilities(
         capabilities: ServerCapabilities,
-        cx: &mut gpui::MutableAppContext,
+        cx: AsyncAppContext,
     ) -> (Self, FakeLanguageServer) {
         let (stdin_writer, stdin_reader) = async_pipe::pipe();
         let (stdout_writer, stdout_reader) = async_pipe::pipe();
+        let (notifications_tx, notifications_rx) = channel::unbounded();
 
-        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
+        let server = Self::new_internal(
+            0,
+            stdin_writer,
+            stdout_reader,
+            Path::new("/"),
+            cx.clone(),
+            |_| {},
+        );
+        let fake = FakeLanguageServer {
+            server: Arc::new(Self::new_internal(
+                0,
+                stdout_writer,
+                stdin_reader,
+                Path::new("/"),
+                cx.clone(),
+                move |msg| {
+                    notifications_tx
+                        .try_send((msg.method.to_string(), msg.params.get().to_string()))
+                        .ok();
+                },
+            )),
+            notifications_rx,
+        };
         fake.handle_request::<request::Initialize, _, _>({
             let capabilities = capabilities.clone();
             move |_, _| {
                 let capabilities = capabilities.clone();
                 async move {
-                    InitializeResult {
+                    Ok(InitializeResult {
                         capabilities,
                         ..Default::default()
-                    }
+                    })
                 }
             }
         });
 
-        let executor = cx.background().clone();
-        let server = Self::new_internal(
-            0,
-            stdin_writer,
-            stdout_reader,
-            Path::new("/"),
-            None,
-            executor,
-        );
         (server, fake)
     }
 }
 
 #[cfg(any(test, feature = "test-support"))]
 impl FakeLanguageServer {
-    fn new(
-        stdin: async_pipe::PipeReader,
-        stdout: async_pipe::PipeWriter,
-        cx: &mut gpui::MutableAppContext,
-    ) -> Self {
-        use futures::StreamExt as _;
-
-        let (incoming_tx, incoming_rx) = futures::channel::mpsc::unbounded();
-        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
-        let handlers = FakeLanguageServerHandlers::default();
-
-        let input_task = cx.spawn(|cx| {
-            let handlers = handlers.clone();
-            let outgoing_tx = outgoing_tx.clone();
-            async move {
-                let mut buffer = Vec::new();
-                let mut stdin = smol::io::BufReader::new(stdin);
-                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
-                    cx.background().simulate_random_delay().await;
-
-                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
-                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
-
-                        let response;
-                        if let Some(handler) = handlers.lock().get_mut(request.method) {
-                            response =
-                                handler(request.id, request.params.get().as_bytes(), cx.clone())
-                                    .await;
-                            log::debug!("handled lsp request. method:{}", request.method);
-                        } else {
-                            response = serde_json::to_vec(&AnyResponse {
-                                id: request.id,
-                                error: Some(Error {
-                                    message: "no handler".to_string(),
-                                }),
-                                result: None,
-                            })
-                            .unwrap();
-                            log::debug!("unhandled lsp request. method:{}", request.method);
-                        }
-                        outgoing_tx.unbounded_send(response)?;
-                    } else {
-                        incoming_tx.unbounded_send(buffer.clone())?;
-                    }
-                }
-                Ok::<_, anyhow::Error>(())
-            }
-        });
-
-        let output_task = cx.background().spawn(async move {
-            let mut stdout = smol::io::BufWriter::new(stdout);
-            while let Some(message) = outgoing_rx.next().await {
-                stdout.write_all(CONTENT_LEN_HEADER.as_bytes()).await?;
-                stdout
-                    .write_all((format!("{}", message.len())).as_bytes())
-                    .await?;
-                stdout.write_all("\r\n\r\n".as_bytes()).await?;
-                stdout.write_all(&message).await?;
-                stdout.flush().await?;
-            }
-            Ok(())
-        });
-
-        Self {
-            outgoing_tx,
-            incoming_rx,
-            handlers,
-            _input_task: input_task,
-            _output_task: output_task,
-        }
-    }
-
-    pub fn notify<T: notification::Notification>(&mut self, params: T::Params) {
-        let message = serde_json::to_vec(&Notification {
-            jsonrpc: JSON_RPC_VERSION,
-            method: T::METHOD,
-            params,
-        })
-        .unwrap();
-        self.outgoing_tx.unbounded_send(message).unwrap();
+    pub fn notify<T: notification::Notification>(&self, params: T::Params) {
+        self.server.notify::<T>(params).ok();
     }
 
     pub async fn receive_notification<T: notification::Notification>(&mut self) -> T::Params {
         use futures::StreamExt as _;
 
         loop {
-            let bytes = self.incoming_rx.next().await.unwrap();
-            if let Ok(notification) = serde_json::from_slice::<Notification<T::Params>>(&bytes) {
-                assert_eq!(notification.method, T::METHOD);
-                return notification.params;
+            let (method, params) = self.notifications_rx.next().await.unwrap();
+            if &method == T::METHOD {
+                return serde_json::from_str::<T::Params>(&params).unwrap();
             } else {
-                log::info!(
-                    "skipping message in fake language server {:?}",
-                    std::str::from_utf8(&bytes)
-                );
+                log::info!("skipping message in fake language server {:?}", params);
             }
         }
     }
 
     pub fn handle_request<T, F, Fut>(
-        &mut self,
+        &self,
         mut handler: F,
     ) -> futures::channel::mpsc::UnboundedReceiver<()>
     where
         T: 'static + request::Request,
+        T::Params: 'static + Send,
         F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
-        Fut: 'static + Send + Future<Output = T::Result>,
+        Fut: 'static + Send + Future<Output = Result<T::Result>>,
     {
-        use futures::FutureExt as _;
-
         let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
-        self.handlers.lock().insert(
-            T::METHOD,
-            Box::new(move |id, params, cx| {
-                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
+        self.server.remove_request_handler::<T>();
+        self.server
+            .on_request::<T, _, _>(move |params, cx| {
+                let result = handler(params, cx.clone());
                 let responded_tx = responded_tx.clone();
                 async move {
+                    cx.background().simulate_random_delay().await;
                     let result = result.await;
-                    let result = serde_json::to_string(&result).unwrap();
-                    let result = serde_json::from_str::<&RawValue>(&result).unwrap();
-                    let response = AnyResponse {
-                        id,
-                        error: None,
-                        result: Some(result),
-                    };
                     responded_tx.unbounded_send(()).ok();
-                    serde_json::to_vec(&response).unwrap()
+                    result
                 }
-                .boxed()
-            }),
-        );
+            })
+            .detach();
         responded_rx
     }
 
@@ -769,7 +682,7 @@ impl FakeLanguageServer {
     where
         T: 'static + request::Request,
     {
-        self.handlers.lock().remove(T::METHOD);
+        self.server.remove_request_handler::<T>();
     }
 
     pub async fn start_progress(&mut self, token: impl Into<String>) {
@@ -785,25 +698,6 @@ impl FakeLanguageServer {
             value: ProgressParamsValue::WorkDone(WorkDoneProgress::End(Default::default())),
         });
     }
-
-    async fn receive(
-        stdin: &mut smol::io::BufReader<async_pipe::PipeReader>,
-        buffer: &mut Vec<u8>,
-    ) -> Result<()> {
-        buffer.clear();
-        stdin.read_until(b'\n', buffer).await?;
-        stdin.read_until(b'\n', buffer).await?;
-        let message_len: usize = std::str::from_utf8(buffer)
-            .unwrap()
-            .strip_prefix(CONTENT_LEN_HEADER)
-            .ok_or_else(|| anyhow!("invalid content length header"))?
-            .trim_end()
-            .parse()
-            .unwrap();
-        buffer.resize(message_len, 0);
-        stdin.read_exact(buffer).await?;
-        Ok(())
-    }
 }
 
 struct ClearResponseHandlers(Arc<Mutex<HashMap<usize, ResponseHandler>>>);
@@ -828,22 +722,22 @@ mod tests {
 
     #[gpui::test]
     async fn test_fake(cx: &mut TestAppContext) {
-        let (mut server, mut fake) = cx.update(LanguageServer::fake);
+        let (server, mut fake) = LanguageServer::fake(cx.to_async());
 
         let (message_tx, message_rx) = channel::unbounded();
         let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
         server
-            .on_notification::<notification::ShowMessage, _>(move |params| {
+            .on_notification::<notification::ShowMessage, _>(move |params, _| {
                 message_tx.try_send(params).unwrap()
             })
             .detach();
         server
-            .on_notification::<notification::PublishDiagnostics, _>(move |params| {
+            .on_notification::<notification::PublishDiagnostics, _>(move |params, _| {
                 diagnostics_tx.try_send(params).unwrap()
             })
             .detach();
 
-        let server = server.initialize().await.unwrap();
+        let server = server.initialize(None).await.unwrap();
         server
             .notify::<notification::DidOpenTextDocument>(DidOpenTextDocumentParams {
                 text_document: TextDocumentItem::new(
@@ -878,7 +772,7 @@ mod tests {
             "file://b/c"
         );
 
-        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move {});
+        fake.handle_request::<request::Shutdown, _, _>(|_, _| async move { Ok(()) });
 
         drop(server);
         fake.receive_notification::<notification::Exit>().await;

crates/project/src/project.rs 🔗

@@ -1325,7 +1325,7 @@ impl Project {
                     cx,
                 );
                 cx.spawn_weak(|this, mut cx| async move {
-                    let mut language_server = language_server?.await.log_err()?;
+                    let language_server = language_server?.await.log_err()?;
                     let this = this.upgrade(&cx)?;
                     let (language_server_events_tx, language_server_events_rx) =
                         smol::channel::unbounded();
@@ -1333,7 +1333,7 @@ impl Project {
                     language_server
                         .on_notification::<lsp::notification::PublishDiagnostics, _>({
                             let language_server_events_tx = language_server_events_tx.clone();
-                            move |params| {
+                            move |params, _| {
                                 language_server_events_tx
                                     .try_send(LanguageServerEvent::DiagnosticsUpdate(params))
                                     .ok();
@@ -1342,31 +1342,33 @@ impl Project {
                         .detach();
 
                     language_server
-                        .on_request::<lsp::request::WorkspaceConfiguration, _>({
+                        .on_request::<lsp::request::WorkspaceConfiguration, _, _>({
                             let settings = this
                                 .read_with(&cx, |this, _| this.language_server_settings.clone());
-                            move |params| {
-                                let settings = settings.lock();
-                                Ok(params
-                                    .items
-                                    .into_iter()
-                                    .map(|item| {
-                                        if let Some(section) = &item.section {
-                                            settings
-                                                .get(section)
-                                                .cloned()
-                                                .unwrap_or(serde_json::Value::Null)
-                                        } else {
-                                            settings.clone()
-                                        }
-                                    })
-                                    .collect())
+                            move |params, _| {
+                                let settings = settings.lock().clone();
+                                async move {
+                                    Ok(params
+                                        .items
+                                        .into_iter()
+                                        .map(|item| {
+                                            if let Some(section) = &item.section {
+                                                settings
+                                                    .get(section)
+                                                    .cloned()
+                                                    .unwrap_or(serde_json::Value::Null)
+                                            } else {
+                                                settings.clone()
+                                            }
+                                        })
+                                        .collect())
+                                }
                             }
                         })
                         .detach();
 
                     language_server
-                        .on_notification::<lsp::notification::Progress, _>(move |params| {
+                        .on_notification::<lsp::notification::Progress, _>(move |params, _| {
                             let token = match params.token {
                                 lsp::NumberOrString::String(token) => token,
                                 lsp::NumberOrString::Number(token) => {
@@ -1406,6 +1408,11 @@ impl Project {
                         })
                         .detach();
 
+                    let language_server = language_server
+                        .initialize(adapter.initialization_options())
+                        .await
+                        .log_err()?;
+
                     // Process all the LSP events.
                     cx.spawn(|mut cx| {
                         let this = this.downgrade();
@@ -1424,7 +1431,6 @@ impl Project {
                     })
                     .detach();
 
-                    let language_server = language_server.initialize().await.log_err()?;
                     this.update(&mut cx, |this, cx| {
                         this.language_servers
                             .insert(key.clone(), (adapter, language_server.clone()));
@@ -4974,9 +4980,9 @@ mod tests {
         });
 
         let mut rust_shutdown_requests = fake_rust_server
-            .handle_request::<lsp::request::Shutdown, _, _>(|_, _| future::ready(()));
+            .handle_request::<lsp::request::Shutdown, _, _>(|_, _| future::ready(Ok(())));
         let mut json_shutdown_requests = fake_json_server
-            .handle_request::<lsp::request::Shutdown, _, _>(|_, _| future::ready(()));
+            .handle_request::<lsp::request::Shutdown, _, _>(|_, _| future::ready(Ok(())));
         futures::join!(rust_shutdown_requests.next(), json_shutdown_requests.next());
 
         let mut fake_rust_server = fake_rust_servers.next().await.unwrap();
@@ -5917,19 +5923,11 @@ mod tests {
             .await;
 
         let buffer = project
-            .update(cx, |project, cx| {
-                project.open_buffer(
-                    ProjectPath {
-                        worktree_id,
-                        path: Path::new("").into(),
-                    },
-                    cx,
-                )
-            })
+            .update(cx, |project, cx| project.open_buffer((worktree_id, ""), cx))
             .await
             .unwrap();
 
-        let mut fake_server = fake_servers.next().await.unwrap();
+        let fake_server = fake_servers.next().await.unwrap();
         fake_server.handle_request::<lsp::request::GotoDefinition, _, _>(|params, _| async move {
             let params = params.text_document_position_params;
             assert_eq!(
@@ -5938,9 +5936,11 @@ mod tests {
             );
             assert_eq!(params.position, lsp::Position::new(0, 22));
 
-            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
-                lsp::Url::from_file_path("/dir/a.rs").unwrap(),
-                lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
+            Ok(Some(lsp::GotoDefinitionResponse::Scalar(
+                lsp::Location::new(
+                    lsp::Url::from_file_path("/dir/a.rs").unwrap(),
+                    lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
+                ),
             )))
         });
 
@@ -6854,7 +6854,7 @@ mod tests {
             .await
             .unwrap();
 
-        let mut fake_server = fake_servers.next().await.unwrap();
+        let fake_server = fake_servers.next().await.unwrap();
 
         let response = project.update(cx, |project, cx| {
             project.prepare_rename(buffer.clone(), 7, cx)
@@ -6863,10 +6863,10 @@ mod tests {
             .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
                 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
                 assert_eq!(params.position, lsp::Position::new(0, 7));
-                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
                     lsp::Position::new(0, 6),
                     lsp::Position::new(0, 9),
-                )))
+                ))))
             })
             .next()
             .await
@@ -6889,7 +6889,7 @@ mod tests {
                     lsp::Position::new(0, 7)
                 );
                 assert_eq!(params.new_name, "THREE");
-                Some(lsp::WorkspaceEdit {
+                Ok(Some(lsp::WorkspaceEdit {
                     changes: Some(
                         [
                             (
@@ -6926,7 +6926,7 @@ mod tests {
                         .collect(),
                     ),
                     ..Default::default()
-                })
+                }))
             })
             .next()
             .await

crates/server/src/rpc.rs 🔗

@@ -2342,7 +2342,7 @@ mod tests {
             Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
         });
 
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         buffer_b
             .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
             .await;
@@ -2368,7 +2368,7 @@ mod tests {
                     lsp::Position::new(0, 14),
                 );
 
-                Some(lsp::CompletionResponse::Array(vec![
+                Ok(Some(lsp::CompletionResponse::Array(vec![
                     lsp::CompletionItem {
                         label: "first_method(…)".into(),
                         detail: Some("fn(&mut self, B) -> C".into()),
@@ -2395,7 +2395,7 @@ mod tests {
                         insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
                         ..Default::default()
                     },
-                ]))
+                ])))
             })
             .next()
             .await
@@ -2425,7 +2425,7 @@ mod tests {
         fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
             |params, _| async move {
                 assert_eq!(params.label, "first_method(…)");
-                lsp::CompletionItem {
+                Ok(lsp::CompletionItem {
                     label: "first_method(…)".into(),
                     detail: Some("fn(&mut self, B) -> C".into()),
                     text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
@@ -2441,7 +2441,7 @@ mod tests {
                     }]),
                     insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
                     ..Default::default()
-                }
+                })
             },
         );
 
@@ -2530,9 +2530,9 @@ mod tests {
             .await
             .unwrap();
 
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
-            Some(vec![
+            Ok(Some(vec![
                 lsp::TextEdit {
                     range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
                     new_text: "h".to_string(),
@@ -2541,7 +2541,7 @@ mod tests {
                     range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
                     new_text: "y".to_string(),
                 },
-            ])
+            ]))
         });
 
         project_b
@@ -2637,12 +2637,14 @@ mod tests {
             .unwrap();
 
         // Request the definition of a symbol as the guest.
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
             |_, _| async move {
-                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
-                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
-                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
+                    lsp::Location::new(
+                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
+                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+                    ),
                 )))
             },
         );
@@ -2669,9 +2671,11 @@ mod tests {
         // the previous call to `definition`.
         fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
             |_, _| async move {
-                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
-                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
-                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
+                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
+                    lsp::Location::new(
+                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
+                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
+                    ),
                 )))
             },
         );
@@ -2778,14 +2782,14 @@ mod tests {
             .unwrap();
 
         // Request references to a symbol as the guest.
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::References, _, _>(
             |params, _| async move {
                 assert_eq!(
                     params.text_document_position.text_document.uri.as_str(),
                     "file:///root-1/one.rs"
                 );
-                Some(vec![
+                Ok(Some(vec![
                     lsp::Location {
                         uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
                         range: lsp::Range::new(
@@ -2807,7 +2811,7 @@ mod tests {
                             lsp::Position::new(0, 40),
                         ),
                     },
-                ])
+                ]))
             },
         );
 
@@ -3018,7 +3022,7 @@ mod tests {
             .unwrap();
 
         // Request document highlights as the guest.
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
             |params, _| async move {
                 assert_eq!(
@@ -3033,7 +3037,7 @@ mod tests {
                     params.text_document_position_params.position,
                     lsp::Position::new(0, 34)
                 );
-                Some(vec![
+                Ok(Some(vec![
                     lsp::DocumentHighlight {
                         kind: Some(lsp::DocumentHighlightKind::WRITE),
                         range: lsp::Range::new(
@@ -3055,7 +3059,7 @@ mod tests {
                             lsp::Position::new(0, 47),
                         ),
                     },
-                ])
+                ]))
             },
         );
 
@@ -3162,11 +3166,11 @@ mod tests {
             .await
             .unwrap();
 
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
             |_, _| async move {
                 #[allow(deprecated)]
-                Some(vec![lsp::SymbolInformation {
+                Ok(Some(vec![lsp::SymbolInformation {
                     name: "TWO".into(),
                     location: lsp::Location {
                         uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
@@ -3176,7 +3180,7 @@ mod tests {
                     tags: None,
                     container_name: None,
                     deprecated: None,
-                }])
+                }]))
             },
         );
 
@@ -3292,12 +3296,14 @@ mod tests {
             .await
             .unwrap();
 
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
             |_, _| async move {
-                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
-                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
-                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
+                    lsp::Location::new(
+                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
+                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
+                    ),
                 )))
             },
         );
@@ -3413,7 +3419,7 @@ mod tests {
                 );
                 assert_eq!(params.range.start, lsp::Position::new(0, 0));
                 assert_eq!(params.range.end, lsp::Position::new(0, 0));
-                None
+                Ok(None)
             })
             .next()
             .await;
@@ -3433,7 +3439,7 @@ mod tests {
                 assert_eq!(params.range.start, lsp::Position::new(1, 31));
                 assert_eq!(params.range.end, lsp::Position::new(1, 31));
 
-                Some(vec![lsp::CodeActionOrCommand::CodeAction(
+                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
                     lsp::CodeAction {
                         title: "Inline into all callers".to_string(),
                         edit: Some(lsp::WorkspaceEdit {
@@ -3475,7 +3481,7 @@ mod tests {
                         })),
                         ..Default::default()
                     },
-                )])
+                )]))
             })
             .next()
             .await;
@@ -3498,7 +3504,7 @@ mod tests {
             .unwrap();
         fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
             |_, _| async move {
-                lsp::CodeAction {
+                Ok(lsp::CodeAction {
                     title: "Inline into all callers".to_string(),
                     edit: Some(lsp::WorkspaceEdit {
                         changes: Some(
@@ -3530,7 +3536,7 @@ mod tests {
                         ..Default::default()
                     }),
                     ..Default::default()
-                }
+                })
             },
         );
 
@@ -3637,7 +3643,7 @@ mod tests {
             .unwrap()
             .downcast::<Editor>()
             .unwrap();
-        let mut fake_language_server = fake_language_servers.next().await.unwrap();
+        let fake_language_server = fake_language_servers.next().await.unwrap();
 
         // Move cursor to a location that can be renamed.
         let prepare_rename = editor_b.update(cx_b, |editor, cx| {
@@ -3649,10 +3655,10 @@ mod tests {
             .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
                 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
                 assert_eq!(params.position, lsp::Position::new(0, 7));
-                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
                     lsp::Position::new(0, 6),
                     lsp::Position::new(0, 9),
-                )))
+                ))))
             })
             .next()
             .await
@@ -3686,7 +3692,7 @@ mod tests {
                     lsp::Position::new(0, 6)
                 );
                 assert_eq!(params.new_name, "THREE");
-                Some(lsp::WorkspaceEdit {
+                Ok(Some(lsp::WorkspaceEdit {
                     changes: Some(
                         [
                             (
@@ -3723,7 +3729,7 @@ mod tests {
                         .collect(),
                     ),
                     ..Default::default()
-                })
+                }))
             })
             .next()
             .await
@@ -4894,36 +4900,38 @@ mod tests {
                 move |fake_server: &mut FakeLanguageServer| {
                     fake_server.handle_request::<lsp::request::Completion, _, _>(
                         |_, _| async move {
-                            Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
-                                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
-                                    range: lsp::Range::new(
-                                        lsp::Position::new(0, 0),
-                                        lsp::Position::new(0, 0),
-                                    ),
-                                    new_text: "the-new-text".to_string(),
-                                })),
-                                ..Default::default()
-                            }]))
+                            Ok(Some(lsp::CompletionResponse::Array(vec![
+                                lsp::CompletionItem {
+                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
+                                        range: lsp::Range::new(
+                                            lsp::Position::new(0, 0),
+                                            lsp::Position::new(0, 0),
+                                        ),
+                                        new_text: "the-new-text".to_string(),
+                                    })),
+                                    ..Default::default()
+                                },
+                            ])))
                         },
                     );
 
                     fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
                         |_, _| async move {
-                            Some(vec![lsp::CodeActionOrCommand::CodeAction(
+                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
                                 lsp::CodeAction {
                                     title: "the-code-action".to_string(),
                                     ..Default::default()
                                 },
-                            )])
+                            )]))
                         },
                     );
 
                     fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
                         |params, _| async move {
-                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
                                 params.position,
                                 params.position,
-                            )))
+                            ))))
                         },
                     );
 
@@ -4941,7 +4949,7 @@ mod tests {
                                     .map(|_| files.choose(&mut *rng).unwrap())
                                     .collect::<Vec<_>>();
                                 log::info!("LSP: Returning definitions in files {:?}", &files);
-                                Some(lsp::GotoDefinitionResponse::Array(
+                                Ok(Some(lsp::GotoDefinitionResponse::Array(
                                     files
                                         .into_iter()
                                         .map(|file| lsp::Location {
@@ -4949,7 +4957,7 @@ mod tests {
                                             range: Default::default(),
                                         })
                                         .collect(),
-                                ))
+                                )))
                             }
                         }
                     });
@@ -4991,7 +4999,7 @@ mod tests {
                             } else {
                                 None
                             };
-                            async move { highlights }
+                            async move { Ok(highlights) }
                         }
                     });
                 }