From 5dcb90858effc47c7f2768b03ddb2a81b443ec8e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 21 Aug 2025 09:24:34 +0300 Subject: [PATCH] Stop waiting for part of LSP responses on remote Collab clients' part (#36557) Instead of holding a connection for potentially long LSP queries (e.g. rust-analyzer might take minutes to look up a definition), disconnect right after sending the initial request and handle the follow-up responses later. As a bonus, this allows to cancel previously sent request on the local Collab clients' side due to this, as instead of holding and serving the old connection, local clients now can stop previous requests, if needed. Current PR does not convert all LSP requests to the new paradigm, but the problematic ones, deprecating `MultiLspQuery` and moving all its requests to the new paradigm. Release Notes: - Improved resource usage when querying LSP over Collab --------- Co-authored-by: David Kleingeld Co-authored-by: Mikayla Maki Co-authored-by: David Kleingeld --- crates/agent_ui/src/acp/message_editor.rs | 8 +- crates/collab/src/rpc.rs | 20 + crates/collab/src/tests/editor_tests.rs | 208 ++- crates/collab/src/tests/integration_tests.rs | 12 +- crates/editor/src/editor.rs | 24 +- crates/editor/src/hover_links.rs | 2 +- crates/editor/src/hover_popover.rs | 2 +- crates/editor/src/proposed_changes_editor.rs | 4 +- crates/editor/src/signature_help.rs | 4 +- crates/lsp/src/lsp.rs | 2 +- crates/project/src/lsp_command.rs | 3 +- crates/project/src/lsp_store.rs | 1202 ++++++++++-------- crates/project/src/project.rs | 49 +- crates/project/src/project_tests.rs | 8 +- crates/proto/proto/lsp.proto | 91 +- crates/proto/proto/zed.proto | 5 +- crates/proto/src/macros.rs | 29 + crates/proto/src/proto.rs | 45 + crates/proto/src/typed_envelope.rs | 52 + crates/rpc/src/proto_client.rs | 304 ++++- 20 files changed, 1394 insertions(+), 680 deletions(-) diff --git a/crates/agent_ui/src/acp/message_editor.rs b/crates/agent_ui/src/acp/message_editor.rs index be133808b7731961f2f5f6cbd4197f64159b713f..1155285d09e487d5fb87e6a62cefae9bd2aac7e0 100644 --- a/crates/agent_ui/src/acp/message_editor.rs +++ b/crates/agent_ui/src/acp/message_editor.rs @@ -1691,7 +1691,7 @@ impl SemanticsProvider for SlashCommandSemanticsProvider { buffer: &Entity, position: text::Anchor, cx: &mut App, - ) -> Option>> { + ) -> Option>>> { let snapshot = buffer.read(cx).snapshot(); let offset = position.to_offset(&snapshot); let (start, end) = self.range.get()?; @@ -1699,14 +1699,14 @@ impl SemanticsProvider for SlashCommandSemanticsProvider { return None; } let range = snapshot.anchor_after(start)..snapshot.anchor_after(end); - Some(Task::ready(vec![project::Hover { + Some(Task::ready(Some(vec![project::Hover { contents: vec![project::HoverBlock { text: "Slash commands are not supported".into(), kind: project::HoverBlockKind::PlainText, }], range: Some(range), language: None, - }])) + }]))) } fn inline_values( @@ -1756,7 +1756,7 @@ impl SemanticsProvider for SlashCommandSemanticsProvider { _position: text::Anchor, _kind: editor::GotoDefinitionKind, _cx: &mut App, - ) -> Option>>> { + ) -> Option>>>> { None } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 06eb68610f0b0b97f42b54ae00c54754de646b0a..73f327166a3f1fb40a1f232ea2fabcdedd3fb129 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -400,6 +400,8 @@ impl Server { .add_request_handler(forward_mutating_project_request::) .add_request_handler(forward_mutating_project_request::) .add_request_handler(multi_lsp_query) + .add_request_handler(lsp_query) + .add_message_handler(broadcast_project_message_from_host::) .add_request_handler(forward_mutating_project_request::) .add_request_handler(forward_mutating_project_request::) .add_request_handler(forward_mutating_project_request::) @@ -910,7 +912,9 @@ impl Server { user_id=field::Empty, login=field::Empty, impersonator=field::Empty, + // todo(lsp) remove after Zed Stable hits v0.204.x multi_lsp_query_request=field::Empty, + lsp_query_request=field::Empty, release_channel=field::Empty, { TOTAL_DURATION_MS }=field::Empty, { PROCESSING_DURATION_MS }=field::Empty, @@ -2356,6 +2360,7 @@ where Ok(()) } +// todo(lsp) remove after Zed Stable hits v0.204.x async fn multi_lsp_query( request: MultiLspQuery, response: Response, @@ -2366,6 +2371,21 @@ async fn multi_lsp_query( forward_mutating_project_request(request, response, session).await } +async fn lsp_query( + request: proto::LspQuery, + response: Response, + session: MessageContext, +) -> Result<()> { + let (name, should_write) = request.query_name_and_write_permissions(); + tracing::Span::current().record("lsp_query_request", name); + tracing::info!("lsp_query message received"); + if should_write { + forward_mutating_project_request(request, response, session).await + } else { + forward_read_only_project_request(request, response, session).await + } +} + /// Notify other participants that a new buffer has been created async fn create_buffer_for_peer( request: proto::CreateBufferForPeer, diff --git a/crates/collab/src/tests/editor_tests.rs b/crates/collab/src/tests/editor_tests.rs index 1b0c581983ac54e9fdea074947bd8eaae4764c81..59d66f1821e60ecbf3a7550c1385fa6de7ae047d 100644 --- a/crates/collab/src/tests/editor_tests.rs +++ b/crates/collab/src/tests/editor_tests.rs @@ -15,13 +15,14 @@ use editor::{ }, }; use fs::Fs; -use futures::{StreamExt, lock::Mutex}; +use futures::{SinkExt, StreamExt, channel::mpsc, lock::Mutex}; use gpui::{App, Rgba, TestAppContext, UpdateGlobal, VisualContext, VisualTestContext}; use indoc::indoc; use language::{ FakeLspAdapter, language_settings::{AllLanguageSettings, InlayHintSettings}, }; +use lsp::LSP_REQUEST_TIMEOUT; use project::{ ProjectPath, SERVER_PROGRESS_THROTTLE_TIMEOUT, lsp_store::lsp_ext_command::{ExpandedMacro, LspExtExpandMacro}, @@ -1017,6 +1018,211 @@ async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut T }) } +#[gpui::test] +async fn test_slow_lsp_server(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { + let mut server = TestServer::start(cx_a.executor()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + .await; + let active_call_a = cx_a.read(ActiveCall::global); + cx_b.update(editor::init); + + let command_name = "test_command"; + let capabilities = lsp::ServerCapabilities { + code_lens_provider: Some(lsp::CodeLensOptions { + resolve_provider: None, + }), + execute_command_provider: Some(lsp::ExecuteCommandOptions { + commands: vec![command_name.to_string()], + ..lsp::ExecuteCommandOptions::default() + }), + ..lsp::ServerCapabilities::default() + }; + client_a.language_registry().add(rust_lang()); + let mut fake_language_servers = client_a.language_registry().register_fake_lsp( + "Rust", + FakeLspAdapter { + capabilities: capabilities.clone(), + ..FakeLspAdapter::default() + }, + ); + client_b.language_registry().add(rust_lang()); + client_b.language_registry().register_fake_lsp_adapter( + "Rust", + FakeLspAdapter { + capabilities, + ..FakeLspAdapter::default() + }, + ); + + client_a + .fs() + .insert_tree( + path!("/dir"), + json!({ + "one.rs": "const ONE: usize = 1;" + }), + ) + .await; + let (project_a, worktree_id) = client_a.build_local_project(path!("/dir"), cx_a).await; + let project_id = active_call_a + .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx)) + .await + .unwrap(); + let project_b = client_b.join_remote_project(project_id, cx_b).await; + + let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b); + let editor_b = workspace_b + .update_in(cx_b, |workspace, window, cx| { + workspace.open_path((worktree_id, "one.rs"), None, true, window, cx) + }) + .await + .unwrap() + .downcast::() + .unwrap(); + let (lsp_store_b, buffer_b) = editor_b.update(cx_b, |editor, cx| { + let lsp_store = editor.project().unwrap().read(cx).lsp_store(); + let buffer = editor.buffer().read(cx).as_singleton().unwrap(); + (lsp_store, buffer) + }); + let fake_language_server = fake_language_servers.next().await.unwrap(); + cx_a.run_until_parked(); + cx_b.run_until_parked(); + + let long_request_time = LSP_REQUEST_TIMEOUT / 2; + let (request_started_tx, mut request_started_rx) = mpsc::unbounded(); + let requests_started = Arc::new(AtomicUsize::new(0)); + let requests_completed = Arc::new(AtomicUsize::new(0)); + let _lens_requests = fake_language_server + .set_request_handler::({ + let request_started_tx = request_started_tx.clone(); + let requests_started = requests_started.clone(); + let requests_completed = requests_completed.clone(); + move |params, cx| { + let mut request_started_tx = request_started_tx.clone(); + let requests_started = requests_started.clone(); + let requests_completed = requests_completed.clone(); + async move { + assert_eq!( + params.text_document.uri.as_str(), + uri!("file:///dir/one.rs") + ); + requests_started.fetch_add(1, atomic::Ordering::Release); + request_started_tx.send(()).await.unwrap(); + cx.background_executor().timer(long_request_time).await; + let i = requests_completed.fetch_add(1, atomic::Ordering::Release) + 1; + Ok(Some(vec![lsp::CodeLens { + range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 9)), + command: Some(lsp::Command { + title: format!("LSP Command {i}"), + command: command_name.to_string(), + arguments: None, + }), + data: None, + }])) + } + } + }); + + // Move cursor to a location, this should trigger the code lens call. + editor_b.update_in(cx_b, |editor, window, cx| { + editor.change_selections(SelectionEffects::no_scroll(), window, cx, |s| { + s.select_ranges([7..7]) + }); + }); + let () = request_started_rx.next().await.unwrap(); + assert_eq!( + requests_started.load(atomic::Ordering::Acquire), + 1, + "Selection change should have initiated the first request" + ); + assert_eq!( + requests_completed.load(atomic::Ordering::Acquire), + 0, + "Slow requests should be running still" + ); + let _first_task = lsp_store_b.update(cx_b, |lsp_store, cx| { + lsp_store + .forget_code_lens_task(buffer_b.read(cx).remote_id()) + .expect("Should have the fetch task started") + }); + + editor_b.update_in(cx_b, |editor, window, cx| { + editor.change_selections(SelectionEffects::no_scroll(), window, cx, |s| { + s.select_ranges([1..1]) + }); + }); + let () = request_started_rx.next().await.unwrap(); + assert_eq!( + requests_started.load(atomic::Ordering::Acquire), + 2, + "Selection change should have initiated the second request" + ); + assert_eq!( + requests_completed.load(atomic::Ordering::Acquire), + 0, + "Slow requests should be running still" + ); + let _second_task = lsp_store_b.update(cx_b, |lsp_store, cx| { + lsp_store + .forget_code_lens_task(buffer_b.read(cx).remote_id()) + .expect("Should have the fetch task started for the 2nd time") + }); + + editor_b.update_in(cx_b, |editor, window, cx| { + editor.change_selections(SelectionEffects::no_scroll(), window, cx, |s| { + s.select_ranges([2..2]) + }); + }); + let () = request_started_rx.next().await.unwrap(); + assert_eq!( + requests_started.load(atomic::Ordering::Acquire), + 3, + "Selection change should have initiated the third request" + ); + assert_eq!( + requests_completed.load(atomic::Ordering::Acquire), + 0, + "Slow requests should be running still" + ); + + _first_task.await.unwrap(); + _second_task.await.unwrap(); + cx_b.run_until_parked(); + assert_eq!( + requests_started.load(atomic::Ordering::Acquire), + 3, + "No selection changes should trigger no more code lens requests" + ); + assert_eq!( + requests_completed.load(atomic::Ordering::Acquire), + 3, + "After enough time, all 3 LSP requests should have been served by the language server" + ); + let resulting_lens_actions = editor_b + .update(cx_b, |editor, cx| { + let lsp_store = editor.project().unwrap().read(cx).lsp_store(); + lsp_store.update(cx, |lsp_store, cx| { + lsp_store.code_lens_actions(&buffer_b, cx) + }) + }) + .await + .unwrap() + .unwrap(); + assert_eq!( + resulting_lens_actions.len(), + 1, + "Should have fetched one code lens action, but got: {resulting_lens_actions:?}" + ); + assert_eq!( + resulting_lens_actions.first().unwrap().lsp_action.title(), + "LSP Command 3", + "Only the final code lens action should be in the data" + ) +} + #[gpui::test(iterations = 10)] async fn test_language_server_statuses(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.executor()).await; diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index e01736f0ef9daced0d35742d08503ea8b188f733..5c732530480a14ab28e231aa0fae1b79ef2703fb 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -4850,6 +4850,7 @@ async fn test_definition( let definitions_1 = project_b .update(cx_b, |p, cx| p.definitions(&buffer_b, 23, cx)) .await + .unwrap() .unwrap(); cx_b.read(|cx| { assert_eq!( @@ -4885,6 +4886,7 @@ async fn test_definition( let definitions_2 = project_b .update(cx_b, |p, cx| p.definitions(&buffer_b, 33, cx)) .await + .unwrap() .unwrap(); cx_b.read(|cx| { assert_eq!(definitions_2.len(), 1); @@ -4922,6 +4924,7 @@ async fn test_definition( let type_definitions = project_b .update(cx_b, |p, cx| p.type_definitions(&buffer_b, 7, cx)) .await + .unwrap() .unwrap(); cx_b.read(|cx| { assert_eq!( @@ -5060,7 +5063,7 @@ async fn test_references( ]))) .unwrap(); - let references = references.await.unwrap(); + let references = references.await.unwrap().unwrap(); executor.run_until_parked(); project_b.read_with(cx_b, |project, cx| { // User is informed that a request is no longer pending. @@ -5104,7 +5107,7 @@ async fn test_references( lsp_response_tx .unbounded_send(Err(anyhow!("can't find references"))) .unwrap(); - assert_eq!(references.await.unwrap(), []); + assert_eq!(references.await.unwrap().unwrap(), []); // User is informed that the request is no longer pending. executor.run_until_parked(); @@ -5505,7 +5508,8 @@ async fn test_lsp_hover( // Request hover information as the guest. let mut hovers = project_b .update(cx_b, |p, cx| p.hover(&buffer_b, 22, cx)) - .await; + .await + .unwrap(); assert_eq!( hovers.len(), 2, @@ -5764,7 +5768,7 @@ async fn test_open_buffer_while_getting_definition_pointing_to_it( definitions = project_b.update(cx_b, |p, cx| p.definitions(&buffer_b1, 23, cx)); } - let definitions = definitions.await.unwrap(); + let definitions = definitions.await.unwrap().unwrap(); assert_eq!( definitions.len(), 1, diff --git a/crates/editor/src/editor.rs b/crates/editor/src/editor.rs index 25fddf5cf1d44f947e7c4128e82158787cdd28f2..e32ea1cb3a6a11a406fb34fa47e78699cd1e3595 100644 --- a/crates/editor/src/editor.rs +++ b/crates/editor/src/editor.rs @@ -15710,7 +15710,9 @@ impl Editor { }; cx.spawn_in(window, async move |editor, cx| { - let definitions = definitions.await?; + let Some(definitions) = definitions.await? else { + return Ok(Navigated::No); + }; let navigated = editor .update_in(cx, |editor, window, cx| { editor.navigate_to_hover_links( @@ -16052,7 +16054,9 @@ impl Editor { } }); - let locations = references.await?; + let Some(locations) = references.await? else { + return anyhow::Ok(Navigated::No); + }; if locations.is_empty() { return anyhow::Ok(Navigated::No); } @@ -21837,7 +21841,7 @@ pub trait SemanticsProvider { buffer: &Entity, position: text::Anchor, cx: &mut App, - ) -> Option>>; + ) -> Option>>>; fn inline_values( &self, @@ -21876,7 +21880,7 @@ pub trait SemanticsProvider { position: text::Anchor, kind: GotoDefinitionKind, cx: &mut App, - ) -> Option>>>; + ) -> Option>>>>; fn range_for_rename( &self, @@ -21989,7 +21993,13 @@ impl CodeActionProvider for Entity { Ok(code_lens_actions .context("code lens fetch")? .into_iter() - .chain(code_actions.context("code action fetch")?) + .flatten() + .chain( + code_actions + .context("code action fetch")? + .into_iter() + .flatten(), + ) .collect()) }) }) @@ -22284,7 +22294,7 @@ impl SemanticsProvider for Entity { buffer: &Entity, position: text::Anchor, cx: &mut App, - ) -> Option>> { + ) -> Option>>> { Some(self.update(cx, |project, cx| project.hover(buffer, position, cx))) } @@ -22305,7 +22315,7 @@ impl SemanticsProvider for Entity { position: text::Anchor, kind: GotoDefinitionKind, cx: &mut App, - ) -> Option>>> { + ) -> Option>>>> { Some(self.update(cx, |project, cx| match kind { GotoDefinitionKind::Symbol => project.definitions(buffer, position, cx), GotoDefinitionKind::Declaration => project.declarations(buffer, position, cx), diff --git a/crates/editor/src/hover_links.rs b/crates/editor/src/hover_links.rs index 04e66a234c0b16131b492264eb9e798e76b24453..1d7d56e67db00e3511c1bf8203f6e757cf2aea6b 100644 --- a/crates/editor/src/hover_links.rs +++ b/crates/editor/src/hover_links.rs @@ -559,7 +559,7 @@ pub fn show_link_definition( provider.definitions(&buffer, buffer_position, preferred_kind, cx) })?; if let Some(task) = task { - task.await.ok().map(|definition_result| { + task.await.ok().flatten().map(|definition_result| { ( definition_result.iter().find_map(|link| { link.origin.as_ref().and_then(|origin| { diff --git a/crates/editor/src/hover_popover.rs b/crates/editor/src/hover_popover.rs index 28a09e947f58ef26f20453e9f36f01e7cd74061e..fab53457876866223be6b7d32f964cd1abd1dd28 100644 --- a/crates/editor/src/hover_popover.rs +++ b/crates/editor/src/hover_popover.rs @@ -428,7 +428,7 @@ fn show_hover( }; let hovers_response = if let Some(hover_request) = hover_request { - hover_request.await + hover_request.await.unwrap_or_default() } else { Vec::new() }; diff --git a/crates/editor/src/proposed_changes_editor.rs b/crates/editor/src/proposed_changes_editor.rs index c79feccb4b1fb0ef7ad686408358e77319ce446c..2d4710a8d44a023f0c3206ad0c327a34c36fdac4 100644 --- a/crates/editor/src/proposed_changes_editor.rs +++ b/crates/editor/src/proposed_changes_editor.rs @@ -431,7 +431,7 @@ impl SemanticsProvider for BranchBufferSemanticsProvider { buffer: &Entity, position: text::Anchor, cx: &mut App, - ) -> Option>> { + ) -> Option>>> { let buffer = self.to_base(buffer, &[position], cx)?; self.0.hover(&buffer, position, cx) } @@ -490,7 +490,7 @@ impl SemanticsProvider for BranchBufferSemanticsProvider { position: text::Anchor, kind: crate::GotoDefinitionKind, cx: &mut App, - ) -> Option>>> { + ) -> Option>>>> { let buffer = self.to_base(buffer, &[position], cx)?; self.0.definitions(&buffer, position, kind, cx) } diff --git a/crates/editor/src/signature_help.rs b/crates/editor/src/signature_help.rs index 5c9800ab55e5f1b53b941c205a2e5601f8f22524..cb21f35d7ed7556cf09f9e566286a10f8317ca6c 100644 --- a/crates/editor/src/signature_help.rs +++ b/crates/editor/src/signature_help.rs @@ -182,7 +182,9 @@ impl Editor { let signature_help = task.await; editor .update(cx, |editor, cx| { - let Some(mut signature_help) = signature_help.into_iter().next() else { + let Some(mut signature_help) = + signature_help.unwrap_or_default().into_iter().next() + else { editor .signature_help_state .hide(SignatureHelpHiddenBy::AutoClose); diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index ce9e2fe229c0aded6fac31c260e334445f987f03..942225d09837c206f54aa324f9b58ec214f92ba2 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -45,7 +45,7 @@ use util::{ConnectionResult, ResultExt, TryFutureExt, redact}; const JSON_RPC_VERSION: &str = "2.0"; const CONTENT_LEN_HEADER: &str = "Content-Length: "; -const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2); +pub const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2); const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); type NotificationHandler = Box, Value, &mut AsyncApp)>; diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index c90d85358a2a4d70ec95ad4c25177026cb2a173c..ce7a871d1a63107ab4908dccea68dd41d73a319f 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -3444,8 +3444,7 @@ impl LspCommand for GetCodeLens { capabilities .server_capabilities .code_lens_provider - .as_ref() - .is_some_and(|code_lens_options| code_lens_options.resolve_provider.unwrap_or(false)) + .is_some() } fn to_lsp( diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index 0b58009f37204fa3383bfc73683826aa3b8d7fb3..bcfd9d386b111dce3e1173fc99b131dfdf317b76 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -72,10 +72,11 @@ use lsp::{ AdapterServerCapabilities, CodeActionKind, CompletionContext, DiagnosticSeverity, DiagnosticTag, DidChangeWatchedFilesRegistrationOptions, Edit, FileOperationFilter, FileOperationPatternKind, FileOperationRegistrationOptions, FileRename, FileSystemWatcher, - LanguageServer, LanguageServerBinary, LanguageServerBinaryOptions, LanguageServerId, - LanguageServerName, LanguageServerSelector, LspRequestFuture, MessageActionItem, MessageType, - OneOf, RenameFilesParams, SymbolKind, TextDocumentSyncSaveOptions, TextEdit, WillRenameFiles, - WorkDoneProgressCancelParams, WorkspaceFolder, notification::DidRenameFiles, + LSP_REQUEST_TIMEOUT, LanguageServer, LanguageServerBinary, LanguageServerBinaryOptions, + LanguageServerId, LanguageServerName, LanguageServerSelector, LspRequestFuture, + MessageActionItem, MessageType, OneOf, RenameFilesParams, SymbolKind, + TextDocumentSyncSaveOptions, TextEdit, WillRenameFiles, WorkDoneProgressCancelParams, + WorkspaceFolder, notification::DidRenameFiles, }; use node_runtime::read_package_installed_version; use parking_lot::Mutex; @@ -84,7 +85,7 @@ use rand::prelude::*; use rpc::{ AnyProtoClient, - proto::{FromProto, ToProto}, + proto::{FromProto, LspRequestId, LspRequestMessage as _, ToProto}, }; use serde::Serialize; use settings::{Settings, SettingsLocation, SettingsStore}; @@ -92,7 +93,7 @@ use sha2::{Digest, Sha256}; use smol::channel::Sender; use snippet::Snippet; use std::{ - any::Any, + any::{Any, TypeId}, borrow::Cow, cell::RefCell, cmp::{Ordering, Reverse}, @@ -3490,6 +3491,7 @@ pub struct LspStore { pub(super) lsp_server_capabilities: HashMap, lsp_document_colors: HashMap, lsp_code_lens: HashMap, + running_lsp_requests: HashMap>)>, } #[derive(Debug, Default, Clone)] @@ -3499,7 +3501,7 @@ pub struct DocumentColors { } type DocumentColorTask = Shared>>>; -type CodeLensTask = Shared, Arc>>>; +type CodeLensTask = Shared>, Arc>>>; #[derive(Debug, Default)] struct DocumentColorData { @@ -3579,6 +3581,8 @@ struct CoreSymbol { impl LspStore { pub fn init(client: &AnyProtoClient) { + client.add_entity_request_handler(Self::handle_lsp_query); + client.add_entity_message_handler(Self::handle_lsp_query_response); client.add_entity_request_handler(Self::handle_multi_lsp_query); client.add_entity_request_handler(Self::handle_restart_language_servers); client.add_entity_request_handler(Self::handle_stop_language_servers); @@ -3758,6 +3762,7 @@ impl LspStore { lsp_server_capabilities: HashMap::default(), lsp_document_colors: HashMap::default(), lsp_code_lens: HashMap::default(), + running_lsp_requests: HashMap::default(), active_entry: None, _maintain_workspace_config, _maintain_buffer_languages: Self::maintain_buffer_languages(languages, cx), @@ -3819,6 +3824,7 @@ impl LspStore { lsp_server_capabilities: HashMap::default(), lsp_document_colors: HashMap::default(), lsp_code_lens: HashMap::default(), + running_lsp_requests: HashMap::default(), active_entry: None, _maintain_workspace_config, @@ -4381,8 +4387,6 @@ impl LspStore { } } - // TODO: remove MultiLspQuery: instead, the proto handler should pick appropriate server(s) - // Then, use `send_lsp_proto_request` or analogue for most of the LSP proto requests and inline this check inside fn is_capable_for_proto_request( &self, buffer: &Entity, @@ -5233,154 +5237,130 @@ impl LspStore { pub fn definitions( &mut self, - buffer_handle: &Entity, + buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetDefinitions { position }; - if !self.is_capable_for_proto_request(buffer_handle, &request, cx) { - return Task::ready(Ok(Vec::new())); + if !self.is_capable_for_proto_request(buffer, &request, cx) { + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer_handle.read(cx).remote_id().into(), - version: serialize_version(&buffer_handle.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetDefinition( - request.to_proto(project_id, buffer_handle.read(cx)), - )), - }); - let buffer = buffer_handle.clone(); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); + let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetDefinitionResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|definitions_response| { - GetDefinitions { position }.response_from_proto( - definitions_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + let actions = join_all(responses.payload.into_iter().map(|response| { + GetDefinitions { position }.response_from_proto( + response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .dedup() - .collect()) + Ok(Some( + actions + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .dedup() + .collect(), + )) }) } else { let definitions_task = self.request_multiple_lsp_locally( - buffer_handle, + buffer, Some(position), GetDefinitions { position }, cx, ); cx.background_spawn(async move { - Ok(definitions_task - .await - .into_iter() - .flat_map(|(_, definitions)| definitions) - .dedup() - .collect()) + Ok(Some( + definitions_task + .await + .into_iter() + .flat_map(|(_, definitions)| definitions) + .dedup() + .collect(), + )) }) } } pub fn declarations( &mut self, - buffer_handle: &Entity, + buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetDeclarations { position }; - if !self.is_capable_for_proto_request(buffer_handle, &request, cx) { - return Task::ready(Ok(Vec::new())); + if !self.is_capable_for_proto_request(buffer, &request, cx) { + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer_handle.read(cx).remote_id().into(), - version: serialize_version(&buffer_handle.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetDeclaration( - request.to_proto(project_id, buffer_handle.read(cx)), - )), - }); - let buffer = buffer_handle.clone(); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); + let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetDeclarationResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|declarations_response| { - GetDeclarations { position }.response_from_proto( - declarations_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + let actions = join_all(responses.payload.into_iter().map(|response| { + GetDeclarations { position }.response_from_proto( + response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .dedup() - .collect()) + Ok(Some( + actions + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .dedup() + .collect(), + )) }) } else { let declarations_task = self.request_multiple_lsp_locally( - buffer_handle, + buffer, Some(position), GetDeclarations { position }, cx, ); cx.background_spawn(async move { - Ok(declarations_task - .await - .into_iter() - .flat_map(|(_, declarations)| declarations) - .dedup() - .collect()) + Ok(Some( + declarations_task + .await + .into_iter() + .flat_map(|(_, declarations)| declarations) + .dedup() + .collect(), + )) }) } } @@ -5390,59 +5370,45 @@ impl LspStore { buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetTypeDefinitions { position }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(Vec::new())); + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetTypeDefinition( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetTypeDefinitionResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|type_definitions_response| { - GetTypeDefinitions { position }.response_from_proto( - type_definitions_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + let actions = join_all(responses.payload.into_iter().map(|response| { + GetTypeDefinitions { position }.response_from_proto( + response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .dedup() - .collect()) + Ok(Some( + actions + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .dedup() + .collect(), + )) }) } else { let type_definitions_task = self.request_multiple_lsp_locally( @@ -5452,12 +5418,14 @@ impl LspStore { cx, ); cx.background_spawn(async move { - Ok(type_definitions_task - .await - .into_iter() - .flat_map(|(_, type_definitions)| type_definitions) - .dedup() - .collect()) + Ok(Some( + type_definitions_task + .await + .into_iter() + .flat_map(|(_, type_definitions)| type_definitions) + .dedup() + .collect(), + )) }) } } @@ -5467,59 +5435,45 @@ impl LspStore { buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetImplementations { position }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(Vec::new())); + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetImplementation( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetImplementationResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|implementations_response| { - GetImplementations { position }.response_from_proto( - implementations_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + let actions = join_all(responses.payload.into_iter().map(|response| { + GetImplementations { position }.response_from_proto( + response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .dedup() - .collect()) + Ok(Some( + actions + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .dedup() + .collect(), + )) }) } else { let implementations_task = self.request_multiple_lsp_locally( @@ -5529,12 +5483,14 @@ impl LspStore { cx, ); cx.background_spawn(async move { - Ok(implementations_task - .await - .into_iter() - .flat_map(|(_, implementations)| implementations) - .dedup() - .collect()) + Ok(Some( + implementations_task + .await + .into_iter() + .flat_map(|(_, implementations)| implementations) + .dedup() + .collect(), + )) }) } } @@ -5544,59 +5500,44 @@ impl LspStore { buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetReferences { position }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(Vec::new())); + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), + + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetReferences( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); + }; + let Some(responses) = request_task.await? else { + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetReferencesResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|references_response| { - GetReferences { position }.response_from_proto( - references_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) - .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .dedup() - .collect()) + let locations = join_all(responses.payload.into_iter().map(|lsp_response| { + GetReferences { position }.response_from_proto( + lsp_response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) + .await + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .dedup() + .collect(); + Ok(Some(locations)) }) } else { let references_task = self.request_multiple_lsp_locally( @@ -5606,12 +5547,14 @@ impl LspStore { cx, ); cx.background_spawn(async move { - Ok(references_task - .await - .into_iter() - .flat_map(|(_, references)| references) - .dedup() - .collect()) + Ok(Some( + references_task + .await + .into_iter() + .flat_map(|(_, references)| references) + .dedup() + .collect(), + )) }) } } @@ -5622,65 +5565,51 @@ impl LspStore { range: Range, kinds: Option>, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetCodeActions { range: range.clone(), kinds: kinds.clone(), }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(Vec::new())); + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetCodeActions( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { let Some(project) = weak_project.upgrade() else { - return Ok(Vec::new()); + return Ok(None); }; - let responses = request_task.await?.responses; - let actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetCodeActionsResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|code_actions_response| { - GetCodeActions { - range: range.clone(), - kinds: kinds.clone(), - } - .response_from_proto( - code_actions_response, - project.clone(), - buffer.clone(), - cx.clone(), - ) - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + let actions = join_all(responses.payload.into_iter().map(|response| { + GetCodeActions { + range: range.clone(), + kinds: kinds.clone(), + } + .response_from_proto( + response.response, + project.clone(), + buffer.clone(), + cx.clone(), + ) + })) .await; - Ok(actions - .into_iter() - .collect::>>>()? - .into_iter() - .flatten() - .collect()) + Ok(Some( + actions + .into_iter() + .collect::>>>()? + .into_iter() + .flatten() + .collect(), + )) }) } else { let all_actions_task = self.request_multiple_lsp_locally( @@ -5690,11 +5619,13 @@ impl LspStore { cx, ); cx.background_spawn(async move { - Ok(all_actions_task - .await - .into_iter() - .flat_map(|(_, actions)| actions) - .collect()) + Ok(Some( + all_actions_task + .await + .into_iter() + .flat_map(|(_, actions)| actions) + .collect(), + )) }) } } @@ -5719,8 +5650,10 @@ impl LspStore { != cached_data.lens.keys().copied().collect() }); if !has_different_servers { - return Task::ready(Ok(cached_data.lens.values().flatten().cloned().collect())) - .shared(); + return Task::ready(Ok(Some( + cached_data.lens.values().flatten().cloned().collect(), + ))) + .shared(); } } @@ -5758,17 +5691,19 @@ impl LspStore { lsp_store .update(cx, |lsp_store, _| { let lsp_data = lsp_store.lsp_code_lens.entry(buffer_id).or_default(); - if lsp_data.lens_for_version == query_version_queried_for { - lsp_data.lens.extend(fetched_lens.clone()); - } else if !lsp_data - .lens_for_version - .changed_since(&query_version_queried_for) - { - lsp_data.lens_for_version = query_version_queried_for; - lsp_data.lens = fetched_lens.clone(); + if let Some(fetched_lens) = fetched_lens { + if lsp_data.lens_for_version == query_version_queried_for { + lsp_data.lens.extend(fetched_lens.clone()); + } else if !lsp_data + .lens_for_version + .changed_since(&query_version_queried_for) + { + lsp_data.lens_for_version = query_version_queried_for; + lsp_data.lens = fetched_lens.clone(); + } } lsp_data.update = None; - lsp_data.lens.values().flatten().cloned().collect() + Some(lsp_data.lens.values().flatten().cloned().collect()) }) .map_err(Arc::new) }) @@ -5781,64 +5716,40 @@ impl LspStore { &mut self, buffer: &Entity, cx: &mut Context, - ) -> Task>>> { + ) -> Task>>>> { if let Some((upstream_client, project_id)) = self.upstream_client() { let request = GetCodeLens; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(HashMap::default())); + return Task::ready(Ok(None)); } - let request_task = upstream_client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), + let request_task = upstream_client.request_lsp( project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetCodeLens( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_lsp_store, cx| { let Some(lsp_store) = weak_lsp_store.upgrade() else { - return Ok(HashMap::default()); + return Ok(None); }; - let responses = request_task.await?.responses; - let code_lens_actions = join_all( - responses - .into_iter() - .filter_map(|lsp_response| { - let response = match lsp_response.response? { - proto::lsp_response::Response::GetCodeLensResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }?; - let server_id = LanguageServerId::from_proto(lsp_response.server_id); - Some((server_id, response)) - }) - .map(|(server_id, code_lens_response)| { - let lsp_store = lsp_store.clone(); - let buffer = buffer.clone(); - let cx = cx.clone(); - async move { - ( - server_id, - GetCodeLens - .response_from_proto( - code_lens_response, - lsp_store, - buffer, - cx, - ) - .await, - ) - } - }), - ) + let Some(responses) = request_task.await? else { + return Ok(None); + }; + + let code_lens_actions = join_all(responses.payload.into_iter().map(|response| { + let lsp_store = lsp_store.clone(); + let buffer = buffer.clone(); + let cx = cx.clone(); + async move { + ( + LanguageServerId::from_proto(response.server_id), + GetCodeLens + .response_from_proto(response.response, lsp_store, buffer, cx) + .await, + ) + } + })) .await; let mut has_errors = false; @@ -5857,14 +5768,14 @@ impl LspStore { !has_errors || !code_lens_actions.is_empty(), "Failed to fetch code lens" ); - Ok(code_lens_actions) + Ok(Some(code_lens_actions)) }) } else { let code_lens_actions_task = self.request_multiple_lsp_locally(buffer, None::, GetCodeLens, cx); - cx.background_spawn( - async move { Ok(code_lens_actions_task.await.into_iter().collect()) }, - ) + cx.background_spawn(async move { + Ok(Some(code_lens_actions_task.await.into_iter().collect())) + }) } } @@ -6480,48 +6391,23 @@ impl LspStore { let buffer_id = buffer.read(cx).remote_id(); if let Some((client, upstream_project_id)) = self.upstream_client() { - if !self.is_capable_for_proto_request( - &buffer, - &GetDocumentDiagnostics { - previous_result_id: None, - }, - cx, - ) { + let request = GetDocumentDiagnostics { + previous_result_id: None, + }; + if !self.is_capable_for_proto_request(&buffer, &request, cx) { return Task::ready(Ok(None)); } - let request_task = client.request(proto::MultiLspQuery { - buffer_id: buffer_id.to_proto(), - version: serialize_version(&buffer.read(cx).version()), - project_id: upstream_project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetDocumentDiagnostics( - proto::GetDocumentDiagnostics { - project_id: upstream_project_id, - buffer_id: buffer_id.to_proto(), - version: serialize_version(&buffer.read(cx).version()), - }, - )), - }); + let request_task = client.request_lsp( + upstream_project_id, + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(upstream_project_id, buffer.read(cx)), + ); cx.background_spawn(async move { - let _proto_responses = request_task - .await? - .responses - .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetDocumentDiagnosticsResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .collect::>(); // Proto requests cause the diagnostics to be pulled from language server(s) on the local side // and then, buffer state updated with the diagnostics received, which will be later propagated to the client. // Do not attempt to further process the dummy responses here. + let _response = request_task.await?; Ok(None) }) } else { @@ -6806,16 +6692,18 @@ impl LspStore { .update(cx, |lsp_store, _| { let lsp_data = lsp_store.lsp_document_colors.entry(buffer_id).or_default(); - if lsp_data.colors_for_version == query_version_queried_for { - lsp_data.colors.extend(fetched_colors.clone()); - lsp_data.cache_version += 1; - } else if !lsp_data - .colors_for_version - .changed_since(&query_version_queried_for) - { - lsp_data.colors_for_version = query_version_queried_for; - lsp_data.colors = fetched_colors.clone(); - lsp_data.cache_version += 1; + if let Some(fetched_colors) = fetched_colors { + if lsp_data.colors_for_version == query_version_queried_for { + lsp_data.colors.extend(fetched_colors.clone()); + lsp_data.cache_version += 1; + } else if !lsp_data + .colors_for_version + .changed_since(&query_version_queried_for) + { + lsp_data.colors_for_version = query_version_queried_for; + lsp_data.colors = fetched_colors.clone(); + lsp_data.cache_version += 1; + } } lsp_data.colors_update = None; let colors = lsp_data @@ -6840,56 +6728,45 @@ impl LspStore { &mut self, buffer: &Entity, cx: &mut Context, - ) -> Task>>> { + ) -> Task>>>> { if let Some((client, project_id)) = self.upstream_client() { let request = GetDocumentColor {}; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Ok(HashMap::default())); + return Task::ready(Ok(None)); } - let request_task = client.request(proto::MultiLspQuery { + let request_task = client.request_lsp( project_id, - buffer_id: buffer.read(cx).remote_id().to_proto(), - version: serialize_version(&buffer.read(cx).version()), - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetDocumentColor( - request.to_proto(project_id, buffer.read(cx)), - )), - }); + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); - cx.spawn(async move |project, cx| { - let Some(project) = project.upgrade() else { - return Ok(HashMap::default()); + cx.spawn(async move |lsp_store, cx| { + let Some(project) = lsp_store.upgrade() else { + return Ok(None); }; let colors = join_all( request_task .await .log_err() - .map(|response| response.responses) + .flatten() + .map(|response| response.payload) .unwrap_or_default() .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetDocumentColorResponse(response) => { - Some(( - LanguageServerId::from_proto(lsp_response.server_id), - response, - )) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|(server_id, color_response)| { + .map(|color_response| { let response = request.response_from_proto( - color_response, + color_response.response, project.clone(), buffer.clone(), cx.clone(), ); - async move { (server_id, response.await.log_err().unwrap_or_default()) } + async move { + ( + LanguageServerId::from_proto(color_response.server_id), + response.await.log_err().unwrap_or_default(), + ) + } }), ) .await @@ -6900,23 +6777,25 @@ impl LspStore { .extend(colors); acc }); - Ok(colors) + Ok(Some(colors)) }) } else { let document_colors_task = self.request_multiple_lsp_locally(buffer, None::, GetDocumentColor, cx); cx.background_spawn(async move { - Ok(document_colors_task - .await - .into_iter() - .fold(HashMap::default(), |mut acc, (server_id, colors)| { - acc.entry(server_id) - .or_insert_with(HashSet::default) - .extend(colors); - acc - }) - .into_iter() - .collect()) + Ok(Some( + document_colors_task + .await + .into_iter() + .fold(HashMap::default(), |mut acc, (server_id, colors)| { + acc.entry(server_id) + .or_insert_with(HashSet::default) + .extend(colors); + acc + }) + .into_iter() + .collect(), + )) }) } } @@ -6926,49 +6805,34 @@ impl LspStore { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task> { + ) -> Task>> { let position = position.to_point_utf16(buffer.read(cx)); if let Some((client, upstream_project_id)) = self.upstream_client() { let request = GetSignatureHelp { position }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Vec::new()); + return Task::ready(None); } - let request_task = client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), - project_id: upstream_project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetSignatureHelp( - request.to_proto(upstream_project_id, buffer.read(cx)), - )), - }); + let request_task = client.request_lsp( + upstream_project_id, + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(upstream_project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { - let Some(project) = weak_project.upgrade() else { - return Vec::new(); - }; - join_all( + let project = weak_project.upgrade()?; + let signatures = join_all( request_task .await .log_err() - .map(|response| response.responses) + .flatten() + .map(|response| response.payload) .unwrap_or_default() .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetSignatureHelpResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|signature_response| { + .map(|response| { let response = GetSignatureHelp { position }.response_from_proto( - signature_response, + response.response, project.clone(), buffer.clone(), cx.clone(), @@ -6979,7 +6843,8 @@ impl LspStore { .await .into_iter() .flatten() - .collect() + .collect(); + Some(signatures) }) } else { let all_actions_task = self.request_multiple_lsp_locally( @@ -6989,11 +6854,13 @@ impl LspStore { cx, ); cx.background_spawn(async move { - all_actions_task - .await - .into_iter() - .flat_map(|(_, actions)| actions) - .collect::>() + Some( + all_actions_task + .await + .into_iter() + .flat_map(|(_, actions)| actions) + .collect::>(), + ) }) } } @@ -7003,47 +6870,32 @@ impl LspStore { buffer: &Entity, position: PointUtf16, cx: &mut Context, - ) -> Task> { + ) -> Task>> { if let Some((client, upstream_project_id)) = self.upstream_client() { let request = GetHover { position }; if !self.is_capable_for_proto_request(buffer, &request, cx) { - return Task::ready(Vec::new()); + return Task::ready(None); } - let request_task = client.request(proto::MultiLspQuery { - buffer_id: buffer.read(cx).remote_id().into(), - version: serialize_version(&buffer.read(cx).version()), - project_id: upstream_project_id, - strategy: Some(proto::multi_lsp_query::Strategy::All( - proto::AllLanguageServers {}, - )), - request: Some(proto::multi_lsp_query::Request::GetHover( - request.to_proto(upstream_project_id, buffer.read(cx)), - )), - }); + let request_task = client.request_lsp( + upstream_project_id, + LSP_REQUEST_TIMEOUT, + cx.background_executor().clone(), + request.to_proto(upstream_project_id, buffer.read(cx)), + ); let buffer = buffer.clone(); cx.spawn(async move |weak_project, cx| { - let Some(project) = weak_project.upgrade() else { - return Vec::new(); - }; - join_all( + let project = weak_project.upgrade()?; + let hovers = join_all( request_task .await .log_err() - .map(|response| response.responses) + .flatten() + .map(|response| response.payload) .unwrap_or_default() .into_iter() - .filter_map(|lsp_response| match lsp_response.response? { - proto::lsp_response::Response::GetHoverResponse(response) => { - Some(response) - } - unexpected => { - debug_panic!("Unexpected response: {unexpected:?}"); - None - } - }) - .map(|hover_response| { + .map(|response| { let response = GetHover { position }.response_from_proto( - hover_response, + response.response, project.clone(), buffer.clone(), cx.clone(), @@ -7060,7 +6912,8 @@ impl LspStore { .await .into_iter() .flatten() - .collect() + .collect(); + Some(hovers) }) } else { let all_actions_task = self.request_multiple_lsp_locally( @@ -7070,11 +6923,13 @@ impl LspStore { cx, ); cx.background_spawn(async move { - all_actions_task - .await - .into_iter() - .filter_map(|(_, hover)| remove_empty_hover_blocks(hover?)) - .collect::>() + Some( + all_actions_task + .await + .into_iter() + .filter_map(|(_, hover)| remove_empty_hover_blocks(hover?)) + .collect::>(), + ) }) } } @@ -8137,6 +7992,203 @@ impl LspStore { })? } + async fn handle_lsp_query( + lsp_store: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + use proto::lsp_query::Request; + let sender_id = envelope.original_sender_id().unwrap_or_default(); + let lsp_query = envelope.payload; + let lsp_request_id = LspRequestId(lsp_query.lsp_request_id); + match lsp_query.request.context("invalid LSP query request")? { + Request::GetReferences(get_references) => { + let position = get_references.position.clone().and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_references, + position, + cx.clone(), + ) + .await?; + } + Request::GetDocumentColor(get_document_color) => { + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_document_color, + None, + cx.clone(), + ) + .await?; + } + Request::GetHover(get_hover) => { + let position = get_hover.position.clone().and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_hover, + position, + cx.clone(), + ) + .await?; + } + Request::GetCodeActions(get_code_actions) => { + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_code_actions, + None, + cx.clone(), + ) + .await?; + } + Request::GetSignatureHelp(get_signature_help) => { + let position = get_signature_help + .position + .clone() + .and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_signature_help, + position, + cx.clone(), + ) + .await?; + } + Request::GetCodeLens(get_code_lens) => { + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_code_lens, + None, + cx.clone(), + ) + .await?; + } + Request::GetDefinition(get_definition) => { + let position = get_definition.position.clone().and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_definition, + position, + cx.clone(), + ) + .await?; + } + Request::GetDeclaration(get_declaration) => { + let position = get_declaration + .position + .clone() + .and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_declaration, + position, + cx.clone(), + ) + .await?; + } + Request::GetTypeDefinition(get_type_definition) => { + let position = get_type_definition + .position + .clone() + .and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_type_definition, + position, + cx.clone(), + ) + .await?; + } + Request::GetImplementation(get_implementation) => { + let position = get_implementation + .position + .clone() + .and_then(deserialize_anchor); + Self::query_lsp_locally::( + lsp_store, + sender_id, + lsp_request_id, + get_implementation, + position, + cx.clone(), + ) + .await?; + } + // Diagnostics pull synchronizes internally via the buffer state, and cannot be handled generically as the other requests. + Request::GetDocumentDiagnostics(get_document_diagnostics) => { + let buffer_id = BufferId::new(get_document_diagnostics.buffer_id())?; + let version = deserialize_version(get_document_diagnostics.buffer_version()); + let buffer = lsp_store.update(&mut cx, |this, cx| { + this.buffer_store.read(cx).get_existing(buffer_id) + })??; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(version.clone()) + })? + .await?; + lsp_store.update(&mut cx, |lsp_store, cx| { + let existing_queries = lsp_store + .running_lsp_requests + .entry(TypeId::of::()) + .or_default(); + if ::ProtoRequest::stop_previous_requests( + ) || buffer.read(cx).version.changed_since(&existing_queries.0) + { + existing_queries.1.clear(); + } + existing_queries.1.insert( + lsp_request_id, + cx.spawn(async move |lsp_store, cx| { + let diagnostics_pull = lsp_store + .update(cx, |lsp_store, cx| { + lsp_store.pull_diagnostics_for_buffer(buffer, cx) + }) + .ok(); + if let Some(diagnostics_pull) = diagnostics_pull { + match diagnostics_pull.await { + Ok(()) => {} + Err(e) => log::error!("Failed to pull diagnostics: {e:#}"), + }; + } + }), + ); + })?; + } + } + Ok(proto::Ack {}) + } + + async fn handle_lsp_query_response( + lsp_store: Entity, + envelope: TypedEnvelope, + cx: AsyncApp, + ) -> Result<()> { + lsp_store.read_with(&cx, |lsp_store, _| { + if let Some((upstream_client, _)) = lsp_store.upstream_client() { + upstream_client.handle_lsp_response(envelope.clone()); + } + })?; + Ok(()) + } + + // todo(lsp) remove after Zed Stable hits v0.204.x async fn handle_multi_lsp_query( lsp_store: Entity, envelope: TypedEnvelope, @@ -12012,6 +12064,88 @@ impl LspStore { Ok(()) } + async fn query_lsp_locally( + lsp_store: Entity, + sender_id: proto::PeerId, + lsp_request_id: LspRequestId, + proto_request: T::ProtoRequest, + position: Option, + mut cx: AsyncApp, + ) -> Result<()> + where + T: LspCommand + Clone, + T::ProtoRequest: proto::LspRequestMessage, + ::Response: + Into<::Response>, + { + let buffer_id = BufferId::new(proto_request.buffer_id())?; + let version = deserialize_version(proto_request.buffer_version()); + let buffer = lsp_store.update(&mut cx, |this, cx| { + this.buffer_store.read(cx).get_existing(buffer_id) + })??; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(version.clone()) + })? + .await?; + let buffer_version = buffer.read_with(&cx, |buffer, _| buffer.version())?; + let request = + T::from_proto(proto_request, lsp_store.clone(), buffer.clone(), cx.clone()).await?; + lsp_store.update(&mut cx, |lsp_store, cx| { + let request_task = + lsp_store.request_multiple_lsp_locally(&buffer, position, request, cx); + let existing_queries = lsp_store + .running_lsp_requests + .entry(TypeId::of::()) + .or_default(); + if T::ProtoRequest::stop_previous_requests() + || buffer_version.changed_since(&existing_queries.0) + { + existing_queries.1.clear(); + } + existing_queries.1.insert( + lsp_request_id, + cx.spawn(async move |lsp_store, cx| { + let response = request_task.await; + lsp_store + .update(cx, |lsp_store, cx| { + if let Some((client, project_id)) = lsp_store.downstream_client.clone() + { + let response = response + .into_iter() + .map(|(server_id, response)| { + ( + server_id.to_proto(), + T::response_to_proto( + response, + lsp_store, + sender_id, + &buffer_version, + cx, + ) + .into(), + ) + }) + .collect::>(); + match client.send_lsp_response::( + project_id, + lsp_request_id, + response, + ) { + Ok(()) => {} + Err(e) => { + log::error!("Failed to send LSP response: {e:#}",) + } + } + } + }) + .ok(); + }), + ); + })?; + Ok(()) + } + fn take_text_document_sync_options( capabilities: &mut lsp::ServerCapabilities, ) -> lsp::TextDocumentSyncOptions { @@ -12025,6 +12159,12 @@ impl LspStore { None => lsp::TextDocumentSyncOptions::default(), } } + + #[cfg(any(test, feature = "test-support"))] + pub fn forget_code_lens_task(&mut self, buffer_id: BufferId) -> Option { + let data = self.lsp_code_lens.get_mut(&buffer_id)?; + Some(data.update.take()?.1) + } } // Registration with registerOptions as null, should fallback to true. diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index e47c020a429fca8e6ed99aec6b89ace2a78d8985..ee4bfcb8ccf18417e18c6f4f408a892f5fe816a9 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -3415,7 +3415,7 @@ impl Project { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let position = position.to_point_utf16(buffer.read(cx)); let guard = self.retain_remotely_created_models(cx); let task = self.lsp_store.update(cx, |lsp_store, cx| { @@ -3433,7 +3433,7 @@ impl Project { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let position = position.to_point_utf16(buffer.read(cx)); let guard = self.retain_remotely_created_models(cx); let task = self.lsp_store.update(cx, |lsp_store, cx| { @@ -3451,7 +3451,7 @@ impl Project { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let position = position.to_point_utf16(buffer.read(cx)); let guard = self.retain_remotely_created_models(cx); let task = self.lsp_store.update(cx, |lsp_store, cx| { @@ -3469,7 +3469,7 @@ impl Project { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let position = position.to_point_utf16(buffer.read(cx)); let guard = self.retain_remotely_created_models(cx); let task = self.lsp_store.update(cx, |lsp_store, cx| { @@ -3487,7 +3487,7 @@ impl Project { buffer: &Entity, position: T, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let position = position.to_point_utf16(buffer.read(cx)); let guard = self.retain_remotely_created_models(cx); let task = self.lsp_store.update(cx, |lsp_store, cx| { @@ -3585,23 +3585,12 @@ impl Project { }) } - pub fn signature_help( - &self, - buffer: &Entity, - position: T, - cx: &mut Context, - ) -> Task> { - self.lsp_store.update(cx, |lsp_store, cx| { - lsp_store.signature_help(buffer, position, cx) - }) - } - pub fn hover( &self, buffer: &Entity, position: T, cx: &mut Context, - ) -> Task> { + ) -> Task>> { let position = position.to_point_utf16(buffer.read(cx)); self.lsp_store .update(cx, |lsp_store, cx| lsp_store.hover(buffer, position, cx)) @@ -3637,7 +3626,7 @@ impl Project { range: Range, kinds: Option>, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let buffer = buffer_handle.read(cx); let range = buffer.anchor_before(range.start)..buffer.anchor_before(range.end); self.lsp_store.update(cx, |lsp_store, cx| { @@ -3650,7 +3639,7 @@ impl Project { buffer: &Entity, range: Range, cx: &mut Context, - ) -> Task>> { + ) -> Task>>> { let snapshot = buffer.read(cx).snapshot(); let range = range.to_point(&snapshot); let range_start = snapshot.anchor_before(range.start); @@ -3668,16 +3657,18 @@ impl Project { let mut code_lens_actions = code_lens_actions .await .map_err(|e| anyhow!("code lens fetch failed: {e:#}"))?; - code_lens_actions.retain(|code_lens_action| { - range - .start - .cmp(&code_lens_action.range.start, &snapshot) - .is_ge() - && range - .end - .cmp(&code_lens_action.range.end, &snapshot) - .is_le() - }); + if let Some(code_lens_actions) = &mut code_lens_actions { + code_lens_actions.retain(|code_lens_action| { + range + .start + .cmp(&code_lens_action.range.start, &snapshot) + .is_ge() + && range + .end + .cmp(&code_lens_action.range.end, &snapshot) + .is_le() + }); + } Ok(code_lens_actions) }) } diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 8b0b21fcd63e2f10509cb9c41a8cc50ca237791b..282f1facc2a9110bd27d249139f4cb4ac644c9c8 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -3005,6 +3005,7 @@ async fn test_definition(cx: &mut gpui::TestAppContext) { let mut definitions = project .update(cx, |project, cx| project.definitions(&buffer, 22, cx)) .await + .unwrap() .unwrap(); // Assert no new language server started @@ -3519,7 +3520,7 @@ async fn test_apply_code_actions_with_commands(cx: &mut gpui::TestAppContext) { .next() .await; - let action = actions.await.unwrap()[0].clone(); + let action = actions.await.unwrap().unwrap()[0].clone(); let apply = project.update(cx, |project, cx| { project.apply_code_action(buffer.clone(), action, true, cx) }); @@ -6110,6 +6111,7 @@ async fn test_multiple_language_server_hovers(cx: &mut gpui::TestAppContext) { hover_task .await .into_iter() + .flatten() .map(|hover| hover.contents.iter().map(|block| &block.text).join("|")) .sorted() .collect::>(), @@ -6183,6 +6185,7 @@ async fn test_hovers_with_empty_parts(cx: &mut gpui::TestAppContext) { hover_task .await .into_iter() + .flatten() .map(|hover| hover.contents.iter().map(|block| &block.text).join("|")) .sorted() .collect::>(), @@ -6261,7 +6264,7 @@ async fn test_code_actions_only_kinds(cx: &mut gpui::TestAppContext) { .await .expect("The code action request should have been triggered"); - let code_actions = code_actions_task.await.unwrap(); + let code_actions = code_actions_task.await.unwrap().unwrap(); assert_eq!(code_actions.len(), 1); assert_eq!( code_actions[0].lsp_action.action_kind(), @@ -6420,6 +6423,7 @@ async fn test_multiple_language_server_actions(cx: &mut gpui::TestAppContext) { code_actions_task .await .unwrap() + .unwrap() .into_iter() .map(|code_action| code_action.lsp_action.title().to_owned()) .sorted() diff --git a/crates/proto/proto/lsp.proto b/crates/proto/proto/lsp.proto index ea9647feff0cf811f0464dc4eca22059b348be6f..ac9c275aa2d67b3df78fc38d4e88497f9f10e6c9 100644 --- a/crates/proto/proto/lsp.proto +++ b/crates/proto/proto/lsp.proto @@ -753,26 +753,45 @@ message TextEdit { PointUtf16 lsp_range_end = 3; } -message MultiLspQuery { +message LspQuery { uint64 project_id = 1; - uint64 buffer_id = 2; - repeated VectorClockEntry version = 3; - oneof strategy { - AllLanguageServers all = 4; - } + uint64 lsp_request_id = 2; oneof request { + GetReferences get_references = 3; + GetDocumentColor get_document_color = 4; GetHover get_hover = 5; GetCodeActions get_code_actions = 6; GetSignatureHelp get_signature_help = 7; GetCodeLens get_code_lens = 8; GetDocumentDiagnostics get_document_diagnostics = 9; - GetDocumentColor get_document_color = 10; - GetDefinition get_definition = 11; - GetDeclaration get_declaration = 12; - GetTypeDefinition get_type_definition = 13; - GetImplementation get_implementation = 14; - GetReferences get_references = 15; + GetDefinition get_definition = 10; + GetDeclaration get_declaration = 11; + GetTypeDefinition get_type_definition = 12; + GetImplementation get_implementation = 13; + } +} + +message LspQueryResponse { + uint64 project_id = 1; + uint64 lsp_request_id = 2; + repeated LspResponse responses = 3; +} + +message LspResponse { + oneof response { + GetHoverResponse get_hover_response = 1; + GetCodeActionsResponse get_code_actions_response = 2; + GetSignatureHelpResponse get_signature_help_response = 3; + GetCodeLensResponse get_code_lens_response = 4; + GetDocumentDiagnosticsResponse get_document_diagnostics_response = 5; + GetDocumentColorResponse get_document_color_response = 6; + GetDefinitionResponse get_definition_response = 8; + GetDeclarationResponse get_declaration_response = 9; + GetTypeDefinitionResponse get_type_definition_response = 10; + GetImplementationResponse get_implementation_response = 11; + GetReferencesResponse get_references_response = 12; } + uint64 server_id = 7; } message AllLanguageServers {} @@ -798,27 +817,6 @@ message StopLanguageServers { bool all = 4; } -message MultiLspQueryResponse { - repeated LspResponse responses = 1; -} - -message LspResponse { - oneof response { - GetHoverResponse get_hover_response = 1; - GetCodeActionsResponse get_code_actions_response = 2; - GetSignatureHelpResponse get_signature_help_response = 3; - GetCodeLensResponse get_code_lens_response = 4; - GetDocumentDiagnosticsResponse get_document_diagnostics_response = 5; - GetDocumentColorResponse get_document_color_response = 6; - GetDefinitionResponse get_definition_response = 8; - GetDeclarationResponse get_declaration_response = 9; - GetTypeDefinitionResponse get_type_definition_response = 10; - GetImplementationResponse get_implementation_response = 11; - GetReferencesResponse get_references_response = 12; - } - uint64 server_id = 7; -} - message LspExtRunnables { uint64 project_id = 1; uint64 buffer_id = 2; @@ -909,3 +907,30 @@ message PullWorkspaceDiagnostics { uint64 project_id = 1; uint64 server_id = 2; } + +// todo(lsp) remove after Zed Stable hits v0.204.x +message MultiLspQuery { + uint64 project_id = 1; + uint64 buffer_id = 2; + repeated VectorClockEntry version = 3; + oneof strategy { + AllLanguageServers all = 4; + } + oneof request { + GetHover get_hover = 5; + GetCodeActions get_code_actions = 6; + GetSignatureHelp get_signature_help = 7; + GetCodeLens get_code_lens = 8; + GetDocumentDiagnostics get_document_diagnostics = 9; + GetDocumentColor get_document_color = 10; + GetDefinition get_definition = 11; + GetDeclaration get_declaration = 12; + GetTypeDefinition get_type_definition = 13; + GetImplementation get_implementation = 14; + GetReferences get_references = 15; + } +} + +message MultiLspQueryResponse { + repeated LspResponse responses = 1; +} diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 310fcf584e99a82606fdfdf39237b808adc61c9f..70689bcd6306195fce0d5c6449bf3dd9f5d43539 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -393,7 +393,10 @@ message Envelope { GetCrashFilesResponse get_crash_files_response = 362; GitClone git_clone = 363; - GitCloneResponse git_clone_response = 364; // current max + GitCloneResponse git_clone_response = 364; + + LspQuery lsp_query = 365; + LspQueryResponse lsp_query_response = 366; // current max } reserved 87 to 88; diff --git a/crates/proto/src/macros.rs b/crates/proto/src/macros.rs index 2ce0c0df259d8d0dc352e118ff53c872852d9fec..59e984d7dbbcd52fb70b7513aea0b5bcb399c204 100644 --- a/crates/proto/src/macros.rs +++ b/crates/proto/src/macros.rs @@ -69,3 +69,32 @@ macro_rules! entity_messages { })* }; } + +#[macro_export] +macro_rules! lsp_messages { + ($(($request_name:ident, $response_name:ident, $stop_previous_requests:expr)),* $(,)?) => { + $(impl LspRequestMessage for $request_name { + type Response = $response_name; + + fn to_proto_query(self) -> $crate::lsp_query::Request { + $crate::lsp_query::Request::$request_name(self) + } + + fn response_to_proto_query(response: Self::Response) -> $crate::lsp_response::Response { + $crate::lsp_response::Response::$response_name(response) + } + + fn buffer_id(&self) -> u64 { + self.buffer_id + } + + fn buffer_version(&self) -> &[$crate::VectorClockEntry] { + &self.version + } + + fn stop_previous_requests() -> bool { + $stop_previous_requests + } + })* + }; +} diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 802db09590a5bb6fc316ce31bd880d394c06c5ca..d38e54685ffb78fe8621b12a0dd25bb6d1ab3f6e 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -169,6 +169,9 @@ messages!( (MarkNotificationRead, Foreground), (MoveChannel, Foreground), (ReorderChannel, Foreground), + (LspQuery, Background), + (LspQueryResponse, Background), + // todo(lsp) remove after Zed Stable hits v0.204.x (MultiLspQuery, Background), (MultiLspQueryResponse, Background), (OnTypeFormatting, Background), @@ -426,7 +429,10 @@ request_messages!( (SetRoomParticipantRole, Ack), (BlameBuffer, BlameBufferResponse), (RejoinRemoteProjects, RejoinRemoteProjectsResponse), + // todo(lsp) remove after Zed Stable hits v0.204.x (MultiLspQuery, MultiLspQueryResponse), + (LspQuery, Ack), + (LspQueryResponse, Ack), (RestartLanguageServers, Ack), (StopLanguageServers, Ack), (OpenContext, OpenContextResponse), @@ -478,6 +484,20 @@ request_messages!( (GitClone, GitCloneResponse) ); +lsp_messages!( + (GetReferences, GetReferencesResponse, true), + (GetDocumentColor, GetDocumentColorResponse, true), + (GetHover, GetHoverResponse, true), + (GetCodeActions, GetCodeActionsResponse, true), + (GetSignatureHelp, GetSignatureHelpResponse, true), + (GetCodeLens, GetCodeLensResponse, true), + (GetDocumentDiagnostics, GetDocumentDiagnosticsResponse, true), + (GetDefinition, GetDefinitionResponse, true), + (GetDeclaration, GetDeclarationResponse, true), + (GetTypeDefinition, GetTypeDefinitionResponse, true), + (GetImplementation, GetImplementationResponse, true), +); + entity_messages!( {project_id, ShareProject}, AddProjectCollaborator, @@ -520,6 +540,9 @@ entity_messages!( LeaveProject, LinkedEditingRange, LoadCommitDiff, + LspQuery, + LspQueryResponse, + // todo(lsp) remove after Zed Stable hits v0.204.x MultiLspQuery, RestartLanguageServers, StopLanguageServers, @@ -777,6 +800,28 @@ pub fn split_repository_update( }]) } +impl LspQuery { + pub fn query_name_and_write_permissions(&self) -> (&str, bool) { + match self.request { + Some(lsp_query::Request::GetHover(_)) => ("GetHover", false), + Some(lsp_query::Request::GetCodeActions(_)) => ("GetCodeActions", true), + Some(lsp_query::Request::GetSignatureHelp(_)) => ("GetSignatureHelp", false), + Some(lsp_query::Request::GetCodeLens(_)) => ("GetCodeLens", true), + Some(lsp_query::Request::GetDocumentDiagnostics(_)) => { + ("GetDocumentDiagnostics", false) + } + Some(lsp_query::Request::GetDefinition(_)) => ("GetDefinition", false), + Some(lsp_query::Request::GetDeclaration(_)) => ("GetDeclaration", false), + Some(lsp_query::Request::GetTypeDefinition(_)) => ("GetTypeDefinition", false), + Some(lsp_query::Request::GetImplementation(_)) => ("GetImplementation", false), + Some(lsp_query::Request::GetReferences(_)) => ("GetReferences", false), + Some(lsp_query::Request::GetDocumentColor(_)) => ("GetDocumentColor", false), + None => ("", true), + } + } +} + +// todo(lsp) remove after Zed Stable hits v0.204.x impl MultiLspQuery { pub fn request_str(&self) -> &str { match self.request { diff --git a/crates/proto/src/typed_envelope.rs b/crates/proto/src/typed_envelope.rs index 381a6379dc95b9f96025315c357fecfe5b8fc937..f677a3b96728a574416cbfc1ec97799ac19184fa 100644 --- a/crates/proto/src/typed_envelope.rs +++ b/crates/proto/src/typed_envelope.rs @@ -31,6 +31,58 @@ pub trait RequestMessage: EnvelopedMessage { type Response: EnvelopedMessage; } +/// A trait to bind LSP request and responses for the proto layer. +/// Should be used for every LSP request that has to traverse through the proto layer. +/// +/// `lsp_messages` macro in the same crate provides a convenient way to implement this. +pub trait LspRequestMessage: EnvelopedMessage { + type Response: EnvelopedMessage; + + fn to_proto_query(self) -> crate::lsp_query::Request; + + fn response_to_proto_query(response: Self::Response) -> crate::lsp_response::Response; + + fn buffer_id(&self) -> u64; + + fn buffer_version(&self) -> &[crate::VectorClockEntry]; + + /// Whether to deduplicate the requests, or keep the previous ones running when another + /// request of the same kind is processed. + fn stop_previous_requests() -> bool; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct LspRequestId(pub u64); + +/// A response from a single language server. +/// There could be multiple responses for a single LSP request, +/// from different servers. +pub struct ProtoLspResponse { + pub server_id: u64, + pub response: R, +} + +impl ProtoLspResponse> { + pub fn into_response(self) -> Result> { + let envelope = self + .response + .into_any() + .downcast::>() + .map_err(|_| { + anyhow::anyhow!( + "cannot downcast LspResponse to {} for message {}", + T::Response::NAME, + T::NAME, + ) + })?; + + Ok(ProtoLspResponse { + server_id: self.server_id, + response: envelope.payload, + }) + } +} + pub trait AnyTypedEnvelope: Any + Send + Sync { fn payload_type_id(&self) -> TypeId; fn payload_type_name(&self) -> &'static str; diff --git a/crates/rpc/src/proto_client.rs b/crates/rpc/src/proto_client.rs index 05b6bd1439c96a3c49dbabe69453e491f23b02da..791b7db9c0ad8e6116ed7fe60a84aeed82a99435 100644 --- a/crates/rpc/src/proto_client.rs +++ b/crates/rpc/src/proto_client.rs @@ -1,35 +1,48 @@ -use anyhow::Context; +use anyhow::{Context, Result}; use collections::HashMap; use futures::{ Future, FutureExt as _, + channel::oneshot, future::{BoxFuture, LocalBoxFuture}, }; -use gpui::{AnyEntity, AnyWeakEntity, AsyncApp, Entity}; +use gpui::{AnyEntity, AnyWeakEntity, AsyncApp, BackgroundExecutor, Entity, FutureExt as _}; +use parking_lot::Mutex; use proto::{ - AnyTypedEnvelope, EntityMessage, Envelope, EnvelopedMessage, RequestMessage, TypedEnvelope, - error::ErrorExt as _, + AnyTypedEnvelope, EntityMessage, Envelope, EnvelopedMessage, LspRequestId, LspRequestMessage, + RequestMessage, TypedEnvelope, error::ErrorExt as _, }; use std::{ any::{Any, TypeId}, - sync::{Arc, Weak}, + sync::{ + Arc, OnceLock, + atomic::{self, AtomicU64}, + }, + time::Duration, }; #[derive(Clone)] -pub struct AnyProtoClient(Arc); +pub struct AnyProtoClient(Arc); -impl AnyProtoClient { - pub fn downgrade(&self) -> AnyWeakProtoClient { - AnyWeakProtoClient(Arc::downgrade(&self.0)) - } -} +type RequestIds = Arc< + Mutex< + HashMap< + LspRequestId, + oneshot::Sender< + Result< + Option>>>>, + >, + >, + >, + >, +>; -#[derive(Clone)] -pub struct AnyWeakProtoClient(Weak); +static NEXT_LSP_REQUEST_ID: OnceLock> = OnceLock::new(); +static REQUEST_IDS: OnceLock = OnceLock::new(); -impl AnyWeakProtoClient { - pub fn upgrade(&self) -> Option { - self.0.upgrade().map(AnyProtoClient) - } +struct State { + client: Arc, + next_lsp_request_id: Arc, + request_ids: RequestIds, } pub trait ProtoClient: Send + Sync { @@ -37,11 +50,11 @@ pub trait ProtoClient: Send + Sync { &self, envelope: Envelope, request_type: &'static str, - ) -> BoxFuture<'static, anyhow::Result>; + ) -> BoxFuture<'static, Result>; - fn send(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>; + fn send(&self, envelope: Envelope, message_type: &'static str) -> Result<()>; - fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>; + fn send_response(&self, envelope: Envelope, message_type: &'static str) -> Result<()>; fn message_handler_set(&self) -> &parking_lot::Mutex; @@ -65,7 +78,7 @@ pub type ProtoMessageHandler = Arc< Box, AnyProtoClient, AsyncApp, - ) -> LocalBoxFuture<'static, anyhow::Result<()>>, + ) -> LocalBoxFuture<'static, Result<()>>, >; impl ProtoMessageHandlerSet { @@ -113,7 +126,7 @@ impl ProtoMessageHandlerSet { message: Box, client: AnyProtoClient, cx: AsyncApp, - ) -> Option>> { + ) -> Option>> { let payload_type_id = message.payload_type_id(); let mut this = this.lock(); let handler = this.message_handlers.get(&payload_type_id)?.clone(); @@ -169,43 +182,195 @@ where T: ProtoClient + 'static, { fn from(client: Arc) -> Self { - Self(client) + Self::new(client) } } impl AnyProtoClient { pub fn new(client: Arc) -> Self { - Self(client) + Self(Arc::new(State { + client, + next_lsp_request_id: NEXT_LSP_REQUEST_ID + .get_or_init(|| Arc::new(AtomicU64::new(0))) + .clone(), + request_ids: REQUEST_IDS.get_or_init(RequestIds::default).clone(), + })) } pub fn is_via_collab(&self) -> bool { - self.0.is_via_collab() + self.0.client.is_via_collab() } pub fn request( &self, request: T, - ) -> impl Future> + use { + ) -> impl Future> + use { let envelope = request.into_envelope(0, None, None); - let response = self.0.request(envelope, T::NAME); + let response = self.0.client.request(envelope, T::NAME); async move { T::Response::from_envelope(response.await?) .context("received response of the wrong type") } } - pub fn send(&self, request: T) -> anyhow::Result<()> { + pub fn send(&self, request: T) -> Result<()> { let envelope = request.into_envelope(0, None, None); - self.0.send(envelope, T::NAME) + self.0.client.send(envelope, T::NAME) + } + + pub fn send_response(&self, request_id: u32, request: T) -> Result<()> { + let envelope = request.into_envelope(0, Some(request_id), None); + self.0.client.send(envelope, T::NAME) } - pub fn send_response( + pub fn request_lsp( &self, - request_id: u32, + project_id: u64, + timeout: Duration, + executor: BackgroundExecutor, request: T, - ) -> anyhow::Result<()> { - let envelope = request.into_envelope(0, Some(request_id), None); - self.0.send(envelope, T::NAME) + ) -> impl Future< + Output = Result>>>>, + > + use + where + T: LspRequestMessage, + { + let new_id = LspRequestId( + self.0 + .next_lsp_request_id + .fetch_add(1, atomic::Ordering::Acquire), + ); + let (tx, rx) = oneshot::channel(); + { + self.0.request_ids.lock().insert(new_id, tx); + } + + let query = proto::LspQuery { + project_id, + lsp_request_id: new_id.0, + request: Some(request.clone().to_proto_query()), + }; + let request = self.request(query); + let request_ids = self.0.request_ids.clone(); + async move { + match request.await { + Ok(_request_enqueued) => {} + Err(e) => { + request_ids.lock().remove(&new_id); + return Err(e).context("sending LSP proto request"); + } + } + + let response = rx.with_timeout(timeout, &executor).await; + { + request_ids.lock().remove(&new_id); + } + match response { + Ok(Ok(response)) => { + let response = response + .context("waiting for LSP proto response")? + .map(|response| { + anyhow::Ok(TypedEnvelope { + payload: response + .payload + .into_iter() + .map(|lsp_response| lsp_response.into_response::()) + .collect::>>()?, + sender_id: response.sender_id, + original_sender_id: response.original_sender_id, + message_id: response.message_id, + received_at: response.received_at, + }) + }) + .transpose() + .context("converting LSP proto response")?; + Ok(response) + } + Err(_cancelled_due_timeout) => Ok(None), + Ok(Err(_channel_dropped)) => Ok(None), + } + } + } + + pub fn send_lsp_response( + &self, + project_id: u64, + lsp_request_id: LspRequestId, + server_responses: HashMap, + ) -> Result<()> { + self.send(proto::LspQueryResponse { + project_id, + lsp_request_id: lsp_request_id.0, + responses: server_responses + .into_iter() + .map(|(server_id, response)| proto::LspResponse { + server_id, + response: Some(T::response_to_proto_query(response)), + }) + .collect(), + }) + } + + pub fn handle_lsp_response(&self, mut envelope: TypedEnvelope) { + let request_id = LspRequestId(envelope.payload.lsp_request_id); + let mut response_senders = self.0.request_ids.lock(); + if let Some(tx) = response_senders.remove(&request_id) { + let responses = envelope.payload.responses.drain(..).collect::>(); + tx.send(Ok(Some(proto::TypedEnvelope { + sender_id: envelope.sender_id, + original_sender_id: envelope.original_sender_id, + message_id: envelope.message_id, + received_at: envelope.received_at, + payload: responses + .into_iter() + .filter_map(|response| { + use proto::lsp_response::Response; + + let server_id = response.server_id; + let response = match response.response? { + Response::GetReferencesResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetDocumentColorResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetHoverResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetCodeActionsResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetSignatureHelpResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetCodeLensResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetDocumentDiagnosticsResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetDefinitionResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetDeclarationResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetTypeDefinitionResponse(response) => { + to_any_envelope(&envelope, response) + } + Response::GetImplementationResponse(response) => { + to_any_envelope(&envelope, response) + } + }; + Some(proto::ProtoLspResponse { + server_id, + response, + }) + }) + .collect(), + }))) + .ok(); + } } pub fn add_request_handler(&self, entity: gpui::WeakEntity, handler: H) @@ -213,31 +378,35 @@ impl AnyProtoClient { M: RequestMessage, E: 'static, H: 'static + Sync + Fn(Entity, TypedEnvelope, AsyncApp) -> F + Send + Sync, - F: 'static + Future>, + F: 'static + Future>, { - self.0.message_handler_set().lock().add_message_handler( - TypeId::of::(), - entity.into(), - Arc::new(move |entity, envelope, client, cx| { - let entity = entity.downcast::().unwrap(); - let envelope = envelope.into_any().downcast::>().unwrap(); - let request_id = envelope.message_id(); - handler(entity, *envelope, cx) - .then(move |result| async move { - match result { - Ok(response) => { - client.send_response(request_id, response)?; - Ok(()) - } - Err(error) => { - client.send_response(request_id, error.to_proto())?; - Err(error) + self.0 + .client + .message_handler_set() + .lock() + .add_message_handler( + TypeId::of::(), + entity.into(), + Arc::new(move |entity, envelope, client, cx| { + let entity = entity.downcast::().unwrap(); + let envelope = envelope.into_any().downcast::>().unwrap(); + let request_id = envelope.message_id(); + handler(entity, *envelope, cx) + .then(move |result| async move { + match result { + Ok(response) => { + client.send_response(request_id, response)?; + Ok(()) + } + Err(error) => { + client.send_response(request_id, error.to_proto())?; + Err(error) + } } - } - }) - .boxed_local() - }), - ) + }) + .boxed_local() + }), + ) } pub fn add_entity_request_handler(&self, handler: H) @@ -245,7 +414,7 @@ impl AnyProtoClient { M: EnvelopedMessage + RequestMessage + EntityMessage, E: 'static, H: 'static + Sync + Send + Fn(gpui::Entity, TypedEnvelope, AsyncApp) -> F, - F: 'static + Future>, + F: 'static + Future>, { let message_type_id = TypeId::of::(); let entity_type_id = TypeId::of::(); @@ -257,6 +426,7 @@ impl AnyProtoClient { .remote_entity_id() }; self.0 + .client .message_handler_set() .lock() .add_entity_message_handler( @@ -290,7 +460,7 @@ impl AnyProtoClient { M: EnvelopedMessage + EntityMessage, E: 'static, H: 'static + Sync + Send + Fn(gpui::Entity, TypedEnvelope, AsyncApp) -> F, - F: 'static + Future>, + F: 'static + Future>, { let message_type_id = TypeId::of::(); let entity_type_id = TypeId::of::(); @@ -302,6 +472,7 @@ impl AnyProtoClient { .remote_entity_id() }; self.0 + .client .message_handler_set() .lock() .add_entity_message_handler( @@ -319,7 +490,7 @@ impl AnyProtoClient { pub fn subscribe_to_entity(&self, remote_id: u64, entity: &Entity) { let id = (TypeId::of::(), remote_id); - let mut message_handlers = self.0.message_handler_set().lock(); + let mut message_handlers = self.0.client.message_handler_set().lock(); if message_handlers .entities_by_type_and_remote_id .contains_key(&id) @@ -335,3 +506,16 @@ impl AnyProtoClient { ); } } + +fn to_any_envelope( + envelope: &TypedEnvelope, + response: T, +) -> Box { + Box::new(proto::TypedEnvelope { + sender_id: envelope.sender_id, + original_sender_id: envelope.original_sender_id, + message_id: envelope.message_id, + received_at: envelope.received_at, + payload: response, + }) as Box<_> +}