Merge pull request #1738 from zed-industries/out-of-sync-diagnostics

Max Brunsfeld created

Fix bugs that caused guests to see different diagnostics than host

Change summary

crates/collab/src/integration_tests.rs | 84 ++++++++++++++++++---------
crates/collab/src/rpc.rs               | 15 +++++
crates/project/src/project.rs          | 14 ++--
crates/project/src/worktree.rs         | 21 ++++---
4 files changed, 89 insertions(+), 45 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -2159,15 +2159,10 @@ async fn test_collaborating_with_diagnostics(
         )
         .await;
     let (project_a, worktree_id) = client_a.build_local_project("/a", cx_a).await;
-    let project_id = active_call_a
-        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
-        .await
-        .unwrap();
 
     // Cause the language server to start.
-    let _buffer = cx_a
-        .background()
-        .spawn(project_a.update(cx_a, |project, cx| {
+    let _buffer = project_a
+        .update(cx_a, |project, cx| {
             project.open_buffer(
                 ProjectPath {
                     worktree_id,
@@ -2175,18 +2170,35 @@ async fn test_collaborating_with_diagnostics(
                 },
                 cx,
             )
-        }))
+        })
         .await
         .unwrap();
 
-    // Join the worktree as client B.
-    let project_b = client_b.build_remote_project(project_id, cx_b).await;
-
     // Simulate a language server reporting errors for a file.
     let mut fake_language_server = fake_language_servers.next().await.unwrap();
     fake_language_server
         .receive_notification::<lsp::notification::DidOpenTextDocument>()
         .await;
+    fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+        lsp::PublishDiagnosticsParams {
+            uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
+            version: None,
+            diagnostics: vec![lsp::Diagnostic {
+                severity: Some(lsp::DiagnosticSeverity::WARNING),
+                range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
+                message: "message 0".to_string(),
+                ..Default::default()
+            }],
+        },
+    );
+
+    // Client A shares the project and, simultaneously, the language server
+    // publishes a diagnostic. This is done to ensure that the server always
+    // observes the latest diagnostics for a worktree.
+    let project_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+        .await
+        .unwrap();
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
         lsp::PublishDiagnosticsParams {
             uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
@@ -2200,6 +2212,9 @@ async fn test_collaborating_with_diagnostics(
         },
     );
 
+    // Join the worktree as client B.
+    let project_b = client_b.build_remote_project(project_id, cx_b).await;
+
     // Wait for server to see the diagnostics update.
     deterministic.run_until_parked();
     {
@@ -2229,24 +2244,35 @@ async fn test_collaborating_with_diagnostics(
 
     // Join project as client C and observe the diagnostics.
     let project_c = client_c.build_remote_project(project_id, cx_c).await;
-    deterministic.run_until_parked();
-    project_c.read_with(cx_c, |project, cx| {
-        assert_eq!(
-            project.diagnostic_summaries(cx).collect::<Vec<_>>(),
-            &[(
-                ProjectPath {
-                    worktree_id,
-                    path: Arc::from(Path::new("a.rs")),
-                },
-                DiagnosticSummary {
-                    error_count: 1,
-                    warning_count: 0,
-                    ..Default::default()
-                },
-            )]
-        )
+    let project_c_diagnostic_summaries = Rc::new(RefCell::new(Vec::new()));
+    project_c.update(cx_c, |_, cx| {
+        let summaries = project_c_diagnostic_summaries.clone();
+        cx.subscribe(&project_c, {
+            move |p, _, event, cx| {
+                if let project::Event::DiskBasedDiagnosticsFinished { .. } = event {
+                    *summaries.borrow_mut() = p.diagnostic_summaries(cx).collect();
+                }
+            }
+        })
+        .detach();
     });
 
+    deterministic.run_until_parked();
+    assert_eq!(
+        project_c_diagnostic_summaries.borrow().as_slice(),
+        &[(
+            ProjectPath {
+                worktree_id,
+                path: Arc::from(Path::new("a.rs")),
+            },
+            DiagnosticSummary {
+                error_count: 1,
+                warning_count: 0,
+                ..Default::default()
+            },
+        )]
+    );
+
     // Simulate a language server reporting more errors for a file.
     fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
         lsp::PublishDiagnosticsParams {
@@ -2321,7 +2347,7 @@ async fn test_collaborating_with_diagnostics(
                 DiagnosticEntry {
                     range: Point::new(0, 4)..Point::new(0, 7),
                     diagnostic: Diagnostic {
-                        group_id: 1,
+                        group_id: 2,
                         message: "message 1".to_string(),
                         severity: lsp::DiagnosticSeverity::ERROR,
                         is_primary: true,
@@ -2331,7 +2357,7 @@ async fn test_collaborating_with_diagnostics(
                 DiagnosticEntry {
                     range: Point::new(0, 10)..Point::new(0, 13),
                     diagnostic: Diagnostic {
-                        group_id: 2,
+                        group_id: 3,
                         severity: lsp::DiagnosticSeverity::WARNING,
                         message: "message 2".to_string(),
                         is_primary: true,

crates/collab/src/rpc.rs 🔗

@@ -1012,6 +1012,21 @@ impl Server {
             }
         }
 
+        for language_server in &project.language_servers {
+            self.peer.send(
+                request.sender_id,
+                proto::UpdateLanguageServer {
+                    project_id: project_id.to_proto(),
+                    language_server_id: language_server.id,
+                    variant: Some(
+                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+                            proto::LspDiskBasedDiagnosticsUpdated {},
+                        ),
+                    ),
+                },
+            )?;
+        }
+
         Ok(())
     }
 

crates/project/src/project.rs 🔗

@@ -1084,13 +1084,6 @@ impl Project {
                 }
             }
 
-            for worktree in self.worktrees(cx).collect::<Vec<_>>() {
-                worktree.update(cx, |worktree, cx| {
-                    let worktree = worktree.as_local_mut().unwrap();
-                    worktree_share_tasks.push(worktree.share(project_id, cx));
-                });
-            }
-
             for (server_id, status) in &self.language_server_statuses {
                 self.client
                     .send(proto::StartLanguageServer {
@@ -1103,6 +1096,13 @@ impl Project {
                     .log_err();
             }
 
+            for worktree in self.worktrees(cx).collect::<Vec<_>>() {
+                worktree.update(cx, |worktree, cx| {
+                    let worktree = worktree.as_local_mut().unwrap();
+                    worktree_share_tasks.push(worktree.share(project_id, cx));
+                });
+            }
+
             self.client_subscriptions
                 .push(self.client.add_model_for_remote_entity(project_id, cx));
             self.metadata_changed(cx);

crates/project/src/worktree.rs 🔗

@@ -959,9 +959,20 @@ impl LocalWorktree {
             let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
             let rpc = self.client.clone();
             let worktree_id = cx.model_id() as u64;
+
+            for (path, summary) in self.diagnostic_summaries.iter() {
+                if let Err(e) = rpc.send(proto::UpdateDiagnosticSummary {
+                    project_id,
+                    worktree_id,
+                    summary: Some(summary.to_proto(&path.0)),
+                }) {
+                    return Task::ready(Err(e));
+                }
+            }
+
             let maintain_remote_snapshot = cx.background().spawn({
                 let rpc = rpc;
-                let diagnostic_summaries = self.diagnostic_summaries.clone();
+
                 async move {
                     let mut prev_snapshot = match snapshots_rx.recv().await {
                         Some(snapshot) => {
@@ -994,14 +1005,6 @@ impl LocalWorktree {
                         }
                     };
 
-                    for (path, summary) in diagnostic_summaries.iter() {
-                        rpc.send(proto::UpdateDiagnosticSummary {
-                            project_id,
-                            worktree_id,
-                            summary: Some(summary.to_proto(&path.0)),
-                        })?;
-                    }
-
                     while let Some(snapshot) = snapshots_rx.recv().await {
                         send_worktree_update(
                             &rpc,