Fix pull diagnostics on the remote clients (#33028)

Kirill Bulatov created

Also add a test.

Release Notes:

- Fixed pull diagnostics on the remote clients

Change summary

crates/collab/src/tests/editor_tests.rs    | 612 +++++++++++++++++++++++
crates/project/src/lsp_command.rs          |   1 
crates/project/src/lsp_store.rs            | 137 ++---
crates/project/src/lsp_store/clangd_ext.rs |   2 
4 files changed, 666 insertions(+), 86 deletions(-)

Detailed changes

crates/collab/src/tests/editor_tests.rs 🔗

@@ -7,7 +7,7 @@ use editor::{
     DocumentColorsRenderMode, Editor, EditorSettings, RowInfo,
     actions::{
         ConfirmCodeAction, ConfirmCompletion, ConfirmRename, ContextMenuFirst,
-        ExpandMacroRecursively, Redo, Rename, SelectAll, ToggleCodeActions, Undo,
+        ExpandMacroRecursively, MoveToEnd, Redo, Rename, SelectAll, ToggleCodeActions, Undo,
     },
     test::{
         editor_test_context::{AssertionContextManager, EditorTestContext},
@@ -15,7 +15,7 @@ use editor::{
     },
 };
 use fs::Fs;
-use futures::StreamExt;
+use futures::{StreamExt, lock::Mutex};
 use gpui::{App, Rgba, TestAppContext, UpdateGlobal, VisualContext, VisualTestContext};
 use indoc::indoc;
 use language::{
@@ -35,7 +35,8 @@ use rpc::RECEIVE_TIMEOUT;
 use serde_json::json;
 use settings::SettingsStore;
 use std::{
-    ops::Range,
+    collections::BTreeSet,
+    ops::{Deref as _, Range},
     path::{Path, PathBuf},
     sync::{
         Arc,
@@ -2227,6 +2228,611 @@ async fn test_lsp_document_color(cx_a: &mut TestAppContext, cx_b: &mut TestAppCo
     });
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_lsp_pull_diagnostics(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
+    let mut server = TestServer::start(cx_a.executor()).await;
+    let executor = cx_a.executor();
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    server
+        .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .await;
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
+
+    cx_a.update(editor::init);
+    cx_b.update(editor::init);
+
+    client_a.language_registry().add(rust_lang());
+    client_b.language_registry().add(rust_lang());
+    let mut fake_language_servers = client_a.language_registry().register_fake_lsp(
+        "Rust",
+        FakeLspAdapter {
+            capabilities: lsp::ServerCapabilities {
+                diagnostic_provider: Some(lsp::DiagnosticServerCapabilities::Options(
+                    lsp::DiagnosticOptions {
+                        identifier: Some("test-pulls".to_string()),
+                        inter_file_dependencies: true,
+                        workspace_diagnostics: true,
+                        work_done_progress_options: lsp::WorkDoneProgressOptions {
+                            work_done_progress: None,
+                        },
+                    },
+                )),
+                ..lsp::ServerCapabilities::default()
+            },
+            ..FakeLspAdapter::default()
+        },
+    );
+
+    // Client A opens a project.
+    client_a
+        .fs()
+        .insert_tree(
+            path!("/a"),
+            json!({
+                "main.rs": "fn main() { a }",
+                "lib.rs": "fn other() {}",
+            }),
+        )
+        .await;
+    let (project_a, worktree_id) = client_a.build_local_project(path!("/a"), cx_a).await;
+    active_call_a
+        .update(cx_a, |call, cx| call.set_location(Some(&project_a), cx))
+        .await
+        .unwrap();
+    let project_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+        .await
+        .unwrap();
+
+    // Client B joins the project
+    let project_b = client_b.join_remote_project(project_id, cx_b).await;
+    active_call_b
+        .update(cx_b, |call, cx| call.set_location(Some(&project_b), cx))
+        .await
+        .unwrap();
+
+    let (workspace_a, cx_a) = client_a.build_workspace(&project_a, cx_a);
+    executor.start_waiting();
+
+    // The host opens a rust file.
+    let _buffer_a = project_a
+        .update(cx_a, |project, cx| {
+            project.open_local_buffer(path!("/a/main.rs"), cx)
+        })
+        .await
+        .unwrap();
+    let editor_a_main = workspace_a
+        .update_in(cx_a, |workspace, window, cx| {
+            workspace.open_path((worktree_id, "main.rs"), None, true, window, cx)
+        })
+        .await
+        .unwrap()
+        .downcast::<Editor>()
+        .unwrap();
+
+    let fake_language_server = fake_language_servers.next().await.unwrap();
+    let expected_push_diagnostic_main_message = "pushed main diagnostic";
+    let expected_push_diagnostic_lib_message = "pushed lib diagnostic";
+    let expected_pull_diagnostic_main_message = "pulled main diagnostic";
+    let expected_pull_diagnostic_lib_message = "pulled lib diagnostic";
+    let expected_workspace_pull_diagnostics_main_message = "pulled workspace main diagnostic";
+    let expected_workspace_pull_diagnostics_lib_message = "pulled workspace lib diagnostic";
+
+    let diagnostics_pulls_result_ids = Arc::new(Mutex::new(BTreeSet::<Option<String>>::new()));
+    let workspace_diagnostics_pulls_result_ids = Arc::new(Mutex::new(BTreeSet::<String>::new()));
+    let diagnostics_pulls_made = Arc::new(AtomicUsize::new(0));
+    let closure_diagnostics_pulls_made = diagnostics_pulls_made.clone();
+    let closure_diagnostics_pulls_result_ids = diagnostics_pulls_result_ids.clone();
+    let mut pull_diagnostics_handle = fake_language_server
+        .set_request_handler::<lsp::request::DocumentDiagnosticRequest, _, _>(move |params, _| {
+            let requests_made = closure_diagnostics_pulls_made.clone();
+            let diagnostics_pulls_result_ids = closure_diagnostics_pulls_result_ids.clone();
+            async move {
+                let message = if lsp::Url::from_file_path(path!("/a/main.rs")).unwrap()
+                    == params.text_document.uri
+                {
+                    expected_pull_diagnostic_main_message.to_string()
+                } else if lsp::Url::from_file_path(path!("/a/lib.rs")).unwrap()
+                    == params.text_document.uri
+                {
+                    expected_pull_diagnostic_lib_message.to_string()
+                } else {
+                    panic!("Unexpected document: {}", params.text_document.uri)
+                };
+                {
+                    diagnostics_pulls_result_ids
+                        .lock()
+                        .await
+                        .insert(params.previous_result_id);
+                }
+                let new_requests_count = requests_made.fetch_add(1, atomic::Ordering::Release) + 1;
+                Ok(lsp::DocumentDiagnosticReportResult::Report(
+                    lsp::DocumentDiagnosticReport::Full(lsp::RelatedFullDocumentDiagnosticReport {
+                        related_documents: None,
+                        full_document_diagnostic_report: lsp::FullDocumentDiagnosticReport {
+                            result_id: Some(format!("pull-{new_requests_count}")),
+                            items: vec![lsp::Diagnostic {
+                                range: lsp::Range {
+                                    start: lsp::Position {
+                                        line: 0,
+                                        character: 0,
+                                    },
+                                    end: lsp::Position {
+                                        line: 0,
+                                        character: 2,
+                                    },
+                                },
+                                severity: Some(lsp::DiagnosticSeverity::ERROR),
+                                message,
+                                ..lsp::Diagnostic::default()
+                            }],
+                        },
+                    }),
+                ))
+            }
+        });
+
+    let workspace_diagnostics_pulls_made = Arc::new(AtomicUsize::new(0));
+    let closure_workspace_diagnostics_pulls_made = workspace_diagnostics_pulls_made.clone();
+    let closure_workspace_diagnostics_pulls_result_ids =
+        workspace_diagnostics_pulls_result_ids.clone();
+    let mut workspace_diagnostics_pulls_handle = fake_language_server
+        .set_request_handler::<lsp::request::WorkspaceDiagnosticRequest, _, _>(
+        move |params, _| {
+            let workspace_requests_made = closure_workspace_diagnostics_pulls_made.clone();
+            let workspace_diagnostics_pulls_result_ids =
+                closure_workspace_diagnostics_pulls_result_ids.clone();
+            async move {
+                let workspace_request_count =
+                    workspace_requests_made.fetch_add(1, atomic::Ordering::Release) + 1;
+                {
+                    workspace_diagnostics_pulls_result_ids
+                        .lock()
+                        .await
+                        .extend(params.previous_result_ids.into_iter().map(|id| id.value));
+                }
+                Ok(lsp::WorkspaceDiagnosticReportResult::Report(
+                    lsp::WorkspaceDiagnosticReport {
+                        items: vec![
+                            lsp::WorkspaceDocumentDiagnosticReport::Full(
+                                lsp::WorkspaceFullDocumentDiagnosticReport {
+                                    uri: lsp::Url::from_file_path(path!("/a/main.rs")).unwrap(),
+                                    version: None,
+                                    full_document_diagnostic_report:
+                                        lsp::FullDocumentDiagnosticReport {
+                                            result_id: Some(format!(
+                                                "workspace_{workspace_request_count}"
+                                            )),
+                                            items: vec![lsp::Diagnostic {
+                                                range: lsp::Range {
+                                                    start: lsp::Position {
+                                                        line: 0,
+                                                        character: 1,
+                                                    },
+                                                    end: lsp::Position {
+                                                        line: 0,
+                                                        character: 3,
+                                                    },
+                                                },
+                                                severity: Some(lsp::DiagnosticSeverity::WARNING),
+                                                message:
+                                                    expected_workspace_pull_diagnostics_main_message
+                                                        .to_string(),
+                                                ..lsp::Diagnostic::default()
+                                            }],
+                                        },
+                                },
+                            ),
+                            lsp::WorkspaceDocumentDiagnosticReport::Full(
+                                lsp::WorkspaceFullDocumentDiagnosticReport {
+                                    uri: lsp::Url::from_file_path(path!("/a/lib.rs")).unwrap(),
+                                    version: None,
+                                    full_document_diagnostic_report:
+                                        lsp::FullDocumentDiagnosticReport {
+                                            result_id: Some(format!(
+                                                "workspace_{workspace_request_count}"
+                                            )),
+                                            items: vec![lsp::Diagnostic {
+                                                range: lsp::Range {
+                                                    start: lsp::Position {
+                                                        line: 0,
+                                                        character: 1,
+                                                    },
+                                                    end: lsp::Position {
+                                                        line: 0,
+                                                        character: 3,
+                                                    },
+                                                },
+                                                severity: Some(lsp::DiagnosticSeverity::WARNING),
+                                                message:
+                                                    expected_workspace_pull_diagnostics_lib_message
+                                                        .to_string(),
+                                                ..lsp::Diagnostic::default()
+                                            }],
+                                        },
+                                },
+                            ),
+                        ],
+                    },
+                ))
+            }
+        },
+    );
+
+    workspace_diagnostics_pulls_handle.next().await.unwrap();
+    assert_eq!(
+        1,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Workspace diagnostics should be pulled initially on a server startup"
+    );
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        1,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Host should query pull diagnostics when the editor is opened"
+    );
+    executor.run_until_parked();
+    editor_a_main.update(cx_a, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        assert_eq!(
+            all_diagnostics.len(),
+            1,
+            "Expected single diagnostic, but got: {all_diagnostics:?}"
+        );
+        let diagnostic = &all_diagnostics[0];
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_main_message,
+            expected_pull_diagnostic_main_message,
+        ];
+        assert!(
+            expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+            "Expected {expected_messages:?} on the host, but got: {}",
+            diagnostic.diagnostic.message
+        );
+    });
+
+    fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+        &lsp::PublishDiagnosticsParams {
+            uri: lsp::Url::from_file_path(path!("/a/main.rs")).unwrap(),
+            diagnostics: vec![lsp::Diagnostic {
+                range: lsp::Range {
+                    start: lsp::Position {
+                        line: 0,
+                        character: 3,
+                    },
+                    end: lsp::Position {
+                        line: 0,
+                        character: 4,
+                    },
+                },
+                severity: Some(lsp::DiagnosticSeverity::INFORMATION),
+                message: expected_push_diagnostic_main_message.to_string(),
+                ..lsp::Diagnostic::default()
+            }],
+            version: None,
+        },
+    );
+    fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+        &lsp::PublishDiagnosticsParams {
+            uri: lsp::Url::from_file_path(path!("/a/lib.rs")).unwrap(),
+            diagnostics: vec![lsp::Diagnostic {
+                range: lsp::Range {
+                    start: lsp::Position {
+                        line: 0,
+                        character: 3,
+                    },
+                    end: lsp::Position {
+                        line: 0,
+                        character: 4,
+                    },
+                },
+                severity: Some(lsp::DiagnosticSeverity::INFORMATION),
+                message: expected_push_diagnostic_lib_message.to_string(),
+                ..lsp::Diagnostic::default()
+            }],
+            version: None,
+        },
+    );
+    executor.run_until_parked();
+    editor_a_main.update(cx_a, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        assert_eq!(
+            all_diagnostics.len(),
+            2,
+            "Expected pull and push diagnostics, but got: {all_diagnostics:?}"
+        );
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_main_message,
+            expected_pull_diagnostic_main_message,
+            expected_push_diagnostic_main_message,
+        ];
+        for diagnostic in all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "Expected push and pull messages on the host: {expected_messages:?}, but got: {}",
+                diagnostic.diagnostic.message
+            );
+        }
+    });
+
+    let (workspace_b, cx_b) = client_b.build_workspace(&project_b, cx_b);
+    let editor_b_main = workspace_b
+        .update_in(cx_b, |workspace, window, cx| {
+            workspace.open_path((worktree_id, "main.rs"), None, true, window, cx)
+        })
+        .await
+        .unwrap()
+        .downcast::<Editor>()
+        .unwrap();
+
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        2,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Client should query pull diagnostics when its editor is opened"
+    );
+    executor.run_until_parked();
+    assert_eq!(
+        1,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Workspace diagnostics should not be changed as the remote client does not initialize the workspace diagnostics pull"
+    );
+    editor_b_main.update(cx_b, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        assert_eq!(
+            all_diagnostics.len(),
+            2,
+            "Expected pull and push diagnostics, but got: {all_diagnostics:?}"
+        );
+
+        // Despite the workspace diagnostics not re-initialized for the remote client, we can still expect its message synced from the host.
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_main_message,
+            expected_pull_diagnostic_main_message,
+            expected_push_diagnostic_main_message,
+        ];
+        for diagnostic in all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "The client should get both push and pull messages: {expected_messages:?}, but got: {}",
+                diagnostic.diagnostic.message
+            );
+        }
+    });
+
+    let editor_b_lib = workspace_b
+        .update_in(cx_b, |workspace, window, cx| {
+            workspace.open_path((worktree_id, "lib.rs"), None, true, window, cx)
+        })
+        .await
+        .unwrap()
+        .downcast::<Editor>()
+        .unwrap();
+
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        3,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Client should query pull diagnostics when its another editor is opened"
+    );
+    executor.run_until_parked();
+    assert_eq!(
+        1,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "The remote client still did not anything to trigger the workspace diagnostics pull"
+    );
+    editor_b_lib.update(cx_b, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        let expected_messages = [
+            expected_pull_diagnostic_lib_message,
+            // TODO bug: the pushed diagnostics are not being sent to the client when they open the corresponding buffer.
+            // expected_push_diagnostic_lib_message,
+        ];
+        assert_eq!(
+            all_diagnostics.len(),
+            1,
+            "Expected pull diagnostics, but got: {all_diagnostics:?}"
+        );
+        for diagnostic in all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "The client should get both push and pull messages: {expected_messages:?}, but got: {}",
+                diagnostic.diagnostic.message
+            );
+        }
+    });
+    {
+        assert_eq!(
+            &BTreeSet::from_iter([None]),
+            diagnostics_pulls_result_ids.lock().await.deref(),
+            "Initial diagnostics pulls should not reuse any result ids"
+        );
+        assert_eq!(
+            0,
+            workspace_diagnostics_pulls_result_ids
+                .lock()
+                .await
+                .deref()
+                .len(),
+            "After the initial workspace request, opening files should not reuse any result ids"
+        );
+    }
+
+    editor_b_lib.update_in(cx_b, |editor, window, cx| {
+        editor.move_to_end(&MoveToEnd, window, cx);
+        editor.handle_input(":", window, cx);
+    });
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        4,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Client lib.rs edits should trigger another diagnostics pull for a buffer"
+    );
+    workspace_diagnostics_pulls_handle.next().await.unwrap();
+    assert_eq!(
+        2,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "After client lib.rs edits, the workspace diagnostics request should follow"
+    );
+    executor.run_until_parked();
+    {
+        assert_eq!(
+            2,
+            diagnostics_pulls_result_ids.lock().await.len(),
+            "One new id for a client's lib.rs pull"
+        );
+        assert_eq!(
+            2,
+            workspace_diagnostics_pulls_result_ids.lock().await.len(),
+            "Two more entries for 2 currently opened files that previously pulled the diagnostics"
+        );
+    }
+
+    editor_b_main.update_in(cx_b, |editor, window, cx| {
+        editor.move_to_end(&MoveToEnd, window, cx);
+        editor.handle_input(":", window, cx);
+    });
+    pull_diagnostics_handle.next().await.unwrap();
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        6,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Client main.rs edits should trigger another diagnostics pull by both client and host as they share the buffer"
+    );
+    workspace_diagnostics_pulls_handle.next().await.unwrap();
+    assert_eq!(
+        3,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "After client main.rs edits, the workspace diagnostics pull should follow"
+    );
+    executor.run_until_parked();
+    {
+        assert_eq!(
+            3,
+            diagnostics_pulls_result_ids.lock().await.len(),
+            "One new id for a client's main.rs pull"
+        );
+        assert_eq!(4, workspace_diagnostics_pulls_result_ids.lock().await.len());
+    }
+
+    editor_a_main.update_in(cx_a, |editor, window, cx| {
+        editor.move_to_end(&MoveToEnd, window, cx);
+        editor.handle_input(":", window, cx);
+    });
+    pull_diagnostics_handle.next().await.unwrap();
+    pull_diagnostics_handle.next().await.unwrap();
+    assert_eq!(
+        8,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Host main.rs edits should trigger another diagnostics pull by both client and host as they share the buffer"
+    );
+    workspace_diagnostics_pulls_handle.next().await.unwrap();
+    assert_eq!(
+        4,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "After host main.rs edits, the workspace diagnostics pull should follow"
+    );
+    executor.run_until_parked();
+    {
+        assert_eq!(5, diagnostics_pulls_result_ids.lock().await.len());
+        assert_eq!(6, workspace_diagnostics_pulls_result_ids.lock().await.len());
+    }
+
+    fake_language_server
+        .request::<lsp::request::WorkspaceDiagnosticRefresh>(())
+        .await
+        .into_response()
+        .expect("workspace diagnostics refresh request failed");
+    assert_eq!(
+        8,
+        diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "No single file pulls should happen after the diagnostics refresh server request"
+    );
+    workspace_diagnostics_pulls_handle.next().await.unwrap();
+    assert_eq!(
+        5,
+        workspace_diagnostics_pulls_made.load(atomic::Ordering::Acquire),
+        "Another workspace diagnostics pull should happen after the diagnostics refresh server request"
+    );
+    {
+        assert_eq!(
+            5,
+            diagnostics_pulls_result_ids.lock().await.len(),
+            "Pulls should not happen hence no extra ids should appear"
+        );
+        assert_eq!(8, workspace_diagnostics_pulls_result_ids.lock().await.len(),);
+    }
+    editor_b_lib.update(cx_b, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_lib_message,
+            expected_pull_diagnostic_lib_message,
+            expected_push_diagnostic_lib_message,
+        ];
+        assert_eq!(all_diagnostics.len(), 1);
+        for diagnostic in &all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "Unexpected diagnostics: {all_diagnostics:?}"
+            );
+        }
+    });
+    editor_b_main.update(cx_b, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        assert_eq!(all_diagnostics.len(), 2);
+
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_main_message,
+            expected_pull_diagnostic_main_message,
+            expected_push_diagnostic_main_message,
+        ];
+        for diagnostic in &all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "Unexpected diagnostics: {all_diagnostics:?}"
+            );
+        }
+    });
+    editor_a_main.update(cx_a, |editor, cx| {
+        let snapshot = editor.buffer().read(cx).snapshot(cx);
+        let all_diagnostics = snapshot
+            .diagnostics_in_range(0..snapshot.len())
+            .collect::<Vec<_>>();
+        assert_eq!(all_diagnostics.len(), 2);
+        let expected_messages = [
+            expected_workspace_pull_diagnostics_main_message,
+            expected_pull_diagnostic_main_message,
+            expected_push_diagnostic_main_message,
+        ];
+        for diagnostic in &all_diagnostics {
+            assert!(
+                expected_messages.contains(&diagnostic.diagnostic.message.as_str()),
+                "Unexpected diagnostics: {all_diagnostics:?}"
+            );
+        }
+    });
+}
+
 #[gpui::test(iterations = 10)]
 async fn test_git_blame_is_forwarded(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
     let mut server = TestServer::start(cx_a.executor()).await;

crates/project/src/lsp_command.rs 🔗

@@ -3902,6 +3902,7 @@ impl GetDocumentDiagnostics {
     }
 }
 
+#[derive(Debug)]
 pub struct WorkspaceLspPullDiagnostics {
     pub version: Option<i32>,
     pub diagnostics: LspPullDiagnostics,

crates/project/src/lsp_store.rs 🔗

@@ -491,7 +491,7 @@ impl LocalLspStore {
                                 None,
                                 DiagnosticSourceKind::Pushed,
                                 &adapter.disk_based_diagnostic_sources,
-                                |diagnostic, cx| match diagnostic.source_kind {
+                                |_, diagnostic, cx| match diagnostic.source_kind {
                                     DiagnosticSourceKind::Other | DiagnosticSourceKind::Pushed => {
                                         adapter.retain_old_diagnostic(diagnostic, cx)
                                     }
@@ -6008,10 +6008,15 @@ impl LspStore {
         buffer: Entity<Buffer>,
         cx: &mut Context<Self>,
     ) -> Task<anyhow::Result<()>> {
+        let buffer_id = buffer.read(cx).remote_id();
         let diagnostics = self.pull_diagnostics(buffer, cx);
         cx.spawn(async move |lsp_store, cx| {
             let diagnostics = diagnostics.await.context("pulling diagnostics")?;
             lsp_store.update(cx, |lsp_store, cx| {
+                if lsp_store.as_local().is_none() {
+                    return;
+                }
+
                 for diagnostics_set in diagnostics {
                     let LspPullDiagnostics::Response {
                         server_id,
@@ -6040,7 +6045,7 @@ impl LspStore {
                                     Some(result_id),
                                     DiagnosticSourceKind::Pulled,
                                     disk_based_sources,
-                                    |_, _| true,
+                                    |_, _, _| true,
                                     cx,
                                 )
                                 .log_err();
@@ -6060,8 +6065,10 @@ impl LspStore {
                                     result_id,
                                     DiagnosticSourceKind::Pulled,
                                     disk_based_sources,
-                                    |old_diagnostic, _| match old_diagnostic.source_kind {
-                                        DiagnosticSourceKind::Pulled => false,
+                                    |buffer, old_diagnostic, _| match old_diagnostic.source_kind {
+                                        DiagnosticSourceKind::Pulled => {
+                                            buffer.remote_id() != buffer_id
+                                        }
                                         DiagnosticSourceKind::Other
                                         | DiagnosticSourceKind::Pushed => true,
                                     },
@@ -7077,19 +7084,19 @@ impl LspStore {
             result_id,
             version,
             diagnostics,
-            |_, _| false,
+            |_, _, _| false,
             cx,
         )
     }
 
-    pub fn merge_diagnostic_entries<F: Fn(&Diagnostic, &App) -> bool + Clone>(
+    pub fn merge_diagnostic_entries(
         &mut self,
         server_id: LanguageServerId,
         abs_path: PathBuf,
         result_id: Option<String>,
         version: Option<i32>,
         mut diagnostics: Vec<DiagnosticEntry<Unclipped<PointUtf16>>>,
-        filter: F,
+        filter: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
         cx: &mut Context<Self>,
     ) -> anyhow::Result<()> {
         let Some((worktree, relative_path)) =
@@ -7111,26 +7118,30 @@ impl LspStore {
                 .get_diagnostics(server_id)
                 .into_iter()
                 .flat_map(|diag| {
-                    diag.iter().filter(|v| filter(&v.diagnostic, cx)).map(|v| {
-                        let start = Unclipped(v.range.start.to_point_utf16(&snapshot));
-                        let end = Unclipped(v.range.end.to_point_utf16(&snapshot));
-                        DiagnosticEntry {
-                            range: start..end,
-                            diagnostic: v.diagnostic.clone(),
-                        }
-                    })
+                    diag.iter()
+                        .filter(|v| filter(buffer, &v.diagnostic, cx))
+                        .map(|v| {
+                            let start = Unclipped(v.range.start.to_point_utf16(&snapshot));
+                            let end = Unclipped(v.range.end.to_point_utf16(&snapshot));
+                            DiagnosticEntry {
+                                range: start..end,
+                                diagnostic: v.diagnostic.clone(),
+                            }
+                        })
                 })
                 .collect::<Vec<_>>();
 
-            self.as_local_mut().unwrap().update_buffer_diagnostics(
-                &buffer_handle,
-                server_id,
-                result_id,
-                version,
-                diagnostics.clone(),
-                reused_diagnostics.clone(),
-                cx,
-            )?;
+            self.as_local_mut()
+                .context("cannot merge diagnostics on a remote LspStore")?
+                .update_buffer_diagnostics(
+                    &buffer_handle,
+                    server_id,
+                    result_id,
+                    version,
+                    diagnostics.clone(),
+                    reused_diagnostics.clone(),
+                    cx,
+                )?;
 
             diagnostics.extend(reused_diagnostics);
         }
@@ -7678,55 +7689,15 @@ impl LspStore {
                         buffer.wait_for_version(deserialize_version(&message.version))
                     })?
                     .await?;
-                let pull_diagnostics = lsp_store.update(&mut cx, |lsp_store, cx| {
-                    let server_ids = buffer.update(cx, |buffer, cx| {
-                        lsp_store
-                            .language_servers_for_local_buffer(buffer, cx)
-                            .map(|(_, server)| server.server_id())
-                            .collect::<Vec<_>>()
-                    });
-
-                    server_ids
-                        .into_iter()
-                        .map(|server_id| {
-                            let result_id = lsp_store.result_id(server_id, buffer_id, cx);
-                            let task = lsp_store.request_lsp(
-                                buffer.clone(),
-                                LanguageServerToQuery::Other(server_id),
-                                GetDocumentDiagnostics {
-                                    previous_result_id: result_id,
-                                },
-                                cx,
-                            );
-                            async move { (server_id, task.await) }
-                        })
-                        .collect::<Vec<_>>()
-                })?;
-
-                let all_diagnostics_responses = join_all(pull_diagnostics).await;
-                let mut all_diagnostics = Vec::new();
-                for (server_id, response) in all_diagnostics_responses {
-                    all_diagnostics.push((server_id, response?));
-                }
-
-                lsp_store.update(&mut cx, |project, cx| proto::MultiLspQueryResponse {
-                    responses: all_diagnostics
-                        .into_iter()
-                        .map(|(server_id, lsp_diagnostic)| proto::LspResponse {
-                            server_id: server_id.to_proto(),
-                            response: Some(
-                                proto::lsp_response::Response::GetDocumentDiagnosticsResponse(
-                                    GetDocumentDiagnostics::response_to_proto(
-                                        lsp_diagnostic,
-                                        project,
-                                        sender_id,
-                                        &buffer_version,
-                                        cx,
-                                    ),
-                                ),
-                            ),
-                        })
-                        .collect(),
+                lsp_store
+                    .update(&mut cx, |lsp_store, cx| {
+                        lsp_store.pull_diagnostics_for_buffer(buffer, cx)
+                    })?
+                    .await?;
+                // `pull_diagnostics_for_buffer` will merge in the new diagnostics and send them to the client.
+                // The client cannot merge anything into its non-local LspStore, so we do not need to return anything.
+                Ok(proto::MultiLspQueryResponse {
+                    responses: Vec::new(),
                 })
             }
             Some(proto::multi_lsp_query::Request::GetDocumentColor(message)) => {
@@ -9510,24 +9481,22 @@ impl LspStore {
             result_id,
             source_kind,
             disk_based_sources,
-            |_, _| false,
+            |_, _, _| false,
             cx,
         )
     }
 
-    pub fn merge_diagnostics<F: Fn(&Diagnostic, &App) -> bool + Clone>(
+    pub fn merge_diagnostics(
         &mut self,
         language_server_id: LanguageServerId,
         mut params: lsp::PublishDiagnosticsParams,
         result_id: Option<String>,
         source_kind: DiagnosticSourceKind,
         disk_based_sources: &[String],
-        filter: F,
+        filter: impl Fn(&Buffer, &Diagnostic, &App) -> bool + Clone,
         cx: &mut Context<Self>,
     ) -> Result<()> {
-        if !self.mode.is_local() {
-            anyhow::bail!("called update_diagnostics on remote");
-        }
+        anyhow::ensure!(self.mode.is_local(), "called update_diagnostics on remote");
         let abs_path = params
             .uri
             .to_file_path()
@@ -10470,7 +10439,7 @@ fn lsp_workspace_diagnostics_refresh(
                                                     Some(result_id),
                                                     DiagnosticSourceKind::Pulled,
                                                     disk_based_sources,
-                                                    |_, _| true,
+                                                    |_, _, _| true,
                                                     cx,
                                                 )
                                                 .log_err();
@@ -10490,8 +10459,12 @@ fn lsp_workspace_diagnostics_refresh(
                                                     result_id,
                                                     DiagnosticSourceKind::Pulled,
                                                     disk_based_sources,
-                                                    |old_diagnostic, _| match old_diagnostic.source_kind {
-                                                        DiagnosticSourceKind::Pulled => false,
+                                                    |buffer, old_diagnostic, cx| match old_diagnostic.source_kind {
+                                                        DiagnosticSourceKind::Pulled => {
+                                                            let buffer_url = File::from_dyn(buffer.file()).map(|f| f.abs_path(cx))
+                                                                .and_then(|abs_path| file_path_to_lsp_url(&abs_path).ok());
+                                                            buffer_url.is_none_or(|buffer_url| buffer_url != uri)
+                                                        },
                                                         DiagnosticSourceKind::Other
                                                         | DiagnosticSourceKind::Pushed => true,
                                                     },

crates/project/src/lsp_store/clangd_ext.rs 🔗

@@ -87,7 +87,7 @@ pub fn register_notifications(
                         None,
                         DiagnosticSourceKind::Pushed,
                         &adapter.disk_based_diagnostic_sources,
-                        |diag, _| !is_inactive_region(diag),
+                        |_, diag, _| !is_inactive_region(diag),
                         cx,
                     )
                     .log_err();