Merge pull request #2409 from zed-industries/stale-excerpts

Max Brunsfeld created

Fix stale project diagnostic excerpts for guests

Change summary

crates/collab/src/tests/integration_tests.rs | 140 +++++++++++
crates/project/src/project.rs                | 258 +++++++++++++--------
2 files changed, 293 insertions(+), 105 deletions(-)

Detailed changes

crates/collab/src/tests/integration_tests.rs 🔗

@@ -32,7 +32,10 @@ use std::{
     env, future, mem,
     path::{Path, PathBuf},
     rc::Rc,
-    sync::Arc,
+    sync::{
+        atomic::{AtomicBool, Ordering::SeqCst},
+        Arc,
+    },
 };
 use unindent::Unindent as _;
 use workspace::{
@@ -3636,6 +3639,141 @@ async fn test_collaborating_with_diagnostics(
     });
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    server
+        .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .await;
+
+    // Set up a fake language server.
+    let mut language = Language::new(
+        LanguageConfig {
+            name: "Rust".into(),
+            path_suffixes: vec!["rs".to_string()],
+            ..Default::default()
+        },
+        Some(tree_sitter_rust::language()),
+    );
+    let mut fake_language_servers = language
+        .set_fake_lsp_adapter(Arc::new(FakeLspAdapter {
+            disk_based_diagnostics_progress_token: Some("the-disk-based-token".into()),
+            disk_based_diagnostics_sources: vec!["the-disk-based-diagnostics-source".into()],
+            ..Default::default()
+        }))
+        .await;
+    client_a.language_registry.add(Arc::new(language));
+
+    let file_names = &["one.rs", "two.rs", "three.rs", "four.rs", "five.rs"];
+    client_a
+        .fs
+        .insert_tree(
+            "/test",
+            json!({
+                "one.rs": "const ONE: usize = 1;",
+                "two.rs": "const TWO: usize = 2;",
+                "three.rs": "const THREE: usize = 3;",
+                "four.rs": "const FOUR: usize = 3;",
+                "five.rs": "const FIVE: usize = 3;",
+            }),
+        )
+        .await;
+
+    let (project_a, worktree_id) = client_a.build_local_project("/test", cx_a).await;
+
+    // Share a project as client A
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let project_id = active_call_a
+        .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+        .await
+        .unwrap();
+
+    // Join the project as client B and open all three files.
+    let project_b = client_b.build_remote_project(project_id, cx_b).await;
+    let guest_buffers = futures::future::try_join_all(file_names.iter().map(|file_name| {
+        project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, file_name), cx))
+    }))
+    .await
+    .unwrap();
+
+    // Simulate a language server reporting errors for a file.
+    let fake_language_server = fake_language_servers.next().await.unwrap();
+    fake_language_server
+        .request::<lsp::request::WorkDoneProgressCreate>(lsp::WorkDoneProgressCreateParams {
+            token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+        })
+        .await
+        .unwrap();
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
+        token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+        value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::Begin(
+            lsp::WorkDoneProgressBegin {
+                title: "Progress Began".into(),
+                ..Default::default()
+            },
+        )),
+    });
+    for file_name in file_names {
+        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
+            lsp::PublishDiagnosticsParams {
+                uri: lsp::Url::from_file_path(Path::new("/test").join(file_name)).unwrap(),
+                version: None,
+                diagnostics: vec![lsp::Diagnostic {
+                    severity: Some(lsp::DiagnosticSeverity::WARNING),
+                    source: Some("the-disk-based-diagnostics-source".into()),
+                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
+                    message: "message one".to_string(),
+                    ..Default::default()
+                }],
+            },
+        );
+    }
+    fake_language_server.notify::<lsp::notification::Progress>(lsp::ProgressParams {
+        token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
+        value: lsp::ProgressParamsValue::WorkDone(lsp::WorkDoneProgress::End(
+            lsp::WorkDoneProgressEnd { message: None },
+        )),
+    });
+
+    // When the "disk base diagnostics finished" message is received, the buffers'
+    // diagnostics are expected to be present.
+    let disk_based_diagnostics_finished = Arc::new(AtomicBool::new(false));
+    project_b.update(cx_b, {
+        let project_b = project_b.clone();
+        let disk_based_diagnostics_finished = disk_based_diagnostics_finished.clone();
+        move |_, cx| {
+            cx.subscribe(&project_b, move |_, _, event, cx| {
+                if let project::Event::DiskBasedDiagnosticsFinished { .. } = event {
+                    disk_based_diagnostics_finished.store(true, SeqCst);
+                    for buffer in &guest_buffers {
+                        assert_eq!(
+                            buffer
+                                .read(cx)
+                                .snapshot()
+                                .diagnostics_in_range::<_, usize>(0..5, false)
+                                .count(),
+                            1,
+                            "expected a diagnostic for buffer {:?}",
+                            buffer.read(cx).file().unwrap().path(),
+                        );
+                    }
+                }
+            })
+            .detach();
+        }
+    });
+
+    deterministic.run_until_parked();
+    assert!(disk_based_diagnostics_finished.load(SeqCst));
+}
+
 #[gpui::test(iterations = 10)]
 async fn test_collaborating_with_completion(
     deterministic: Arc<Deterministic>,

crates/project/src/project.rs 🔗

@@ -93,7 +93,7 @@ pub trait Item {
 pub struct Project {
     worktrees: Vec<WorktreeHandle>,
     active_entry: Option<ProjectEntryId>,
-    buffer_changes_tx: mpsc::UnboundedSender<BufferMessage>,
+    buffer_ordered_messages_tx: mpsc::UnboundedSender<BufferOrderedMessage>,
     languages: Arc<LanguageRegistry>,
     language_servers: HashMap<LanguageServerId, LanguageServerState>,
     language_server_ids: HashMap<(WorktreeId, LanguageServerName), LanguageServerId>,
@@ -137,11 +137,16 @@ struct LspBufferSnapshot {
     snapshot: TextBufferSnapshot,
 }
 
-enum BufferMessage {
+/// Message ordered with respect to buffer operations
+enum BufferOrderedMessage {
     Operation {
         buffer_id: u64,
         operation: proto::Operation,
     },
+    LanguageServerUpdate {
+        language_server_id: LanguageServerId,
+        message: proto::update_language_server::Variant,
+    },
     Resync,
 }
 
@@ -443,11 +448,11 @@ impl Project {
     ) -> ModelHandle<Self> {
         cx.add_model(|cx: &mut ModelContext<Self>| {
             let (tx, rx) = mpsc::unbounded();
-            cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
                 .detach();
             Self {
                 worktrees: Default::default(),
-                buffer_changes_tx: tx,
+                buffer_ordered_messages_tx: tx,
                 collaborators: Default::default(),
                 opened_buffers: Default::default(),
                 shared_buffers: Default::default(),
@@ -511,11 +516,11 @@ impl Project {
             }
 
             let (tx, rx) = mpsc::unbounded();
-            cx.spawn_weak(|this, cx| Self::send_buffer_messages(this, rx, cx))
+            cx.spawn_weak(|this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
                 .detach();
             let mut this = Self {
                 worktrees: Vec::new(),
-                buffer_changes_tx: tx,
+                buffer_ordered_messages_tx: tx,
                 loading_buffers_by_path: Default::default(),
                 opened_buffer: watch::channel(),
                 shared_buffers: Default::default(),
@@ -1168,8 +1173,8 @@ impl Project {
                 )
             })
             .collect();
-        self.buffer_changes_tx
-            .unbounded_send(BufferMessage::Resync)
+        self.buffer_ordered_messages_tx
+            .unbounded_send(BufferOrderedMessage::Resync)
             .unwrap();
         cx.notify();
         Ok(())
@@ -1784,23 +1789,49 @@ impl Project {
         }
     }
 
-    async fn send_buffer_messages(
+    async fn send_buffer_ordered_messages(
         this: WeakModelHandle<Self>,
-        rx: UnboundedReceiver<BufferMessage>,
+        rx: UnboundedReceiver<BufferOrderedMessage>,
         mut cx: AsyncAppContext,
     ) -> Option<()> {
         const MAX_BATCH_SIZE: usize = 128;
 
-        let mut needs_resync_with_host = false;
         let mut operations_by_buffer_id = HashMap::default();
+        async fn flush_operations(
+            this: &ModelHandle<Project>,
+            operations_by_buffer_id: &mut HashMap<u64, Vec<proto::Operation>>,
+            needs_resync_with_host: &mut bool,
+            is_local: bool,
+            cx: &AsyncAppContext,
+        ) {
+            for (buffer_id, operations) in operations_by_buffer_id.drain() {
+                let request = this.read_with(cx, |this, _| {
+                    let project_id = this.remote_id()?;
+                    Some(this.client.request(proto::UpdateBuffer {
+                        buffer_id,
+                        project_id,
+                        operations,
+                    }))
+                });
+                if let Some(request) = request {
+                    if request.await.is_err() && !is_local {
+                        *needs_resync_with_host = true;
+                        break;
+                    }
+                }
+            }
+        }
+
+        let mut needs_resync_with_host = false;
         let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
+
         while let Some(changes) = changes.next().await {
             let this = this.upgrade(&mut cx)?;
             let is_local = this.read_with(&cx, |this, _| this.is_local());
 
             for change in changes {
                 match change {
-                    BufferMessage::Operation {
+                    BufferOrderedMessage::Operation {
                         buffer_id,
                         operation,
                     } => {
@@ -1813,7 +1844,8 @@ impl Project {
                             .or_insert(Vec::new())
                             .push(operation);
                     }
-                    BufferMessage::Resync => {
+
+                    BufferOrderedMessage::Resync => {
                         operations_by_buffer_id.clear();
                         if this
                             .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
@@ -1823,25 +1855,43 @@ impl Project {
                             needs_resync_with_host = false;
                         }
                     }
-                }
-            }
 
-            for (buffer_id, operations) in operations_by_buffer_id.drain() {
-                let request = this.read_with(&cx, |this, _| {
-                    let project_id = this.remote_id()?;
-                    Some(this.client.request(proto::UpdateBuffer {
-                        buffer_id,
-                        project_id,
-                        operations,
-                    }))
-                });
-                if let Some(request) = request {
-                    if request.await.is_err() && !is_local {
-                        needs_resync_with_host = true;
-                        break;
+                    BufferOrderedMessage::LanguageServerUpdate {
+                        language_server_id,
+                        message,
+                    } => {
+                        flush_operations(
+                            &this,
+                            &mut operations_by_buffer_id,
+                            &mut needs_resync_with_host,
+                            is_local,
+                            &cx,
+                        )
+                        .await;
+
+                        this.read_with(&cx, |this, _| {
+                            if let Some(project_id) = this.remote_id() {
+                                this.client
+                                    .send(proto::UpdateLanguageServer {
+                                        project_id,
+                                        language_server_id: language_server_id.0 as u64,
+                                        variant: Some(message),
+                                    })
+                                    .log_err();
+                            }
+                        });
                     }
                 }
             }
+
+            flush_operations(
+                &this,
+                &mut operations_by_buffer_id,
+                &mut needs_resync_with_host,
+                is_local,
+                &cx,
+            )
+            .await;
         }
 
         None
@@ -1855,8 +1905,8 @@ impl Project {
     ) -> Option<()> {
         match event {
             BufferEvent::Operation(operation) => {
-                self.buffer_changes_tx
-                    .unbounded_send(BufferMessage::Operation {
+                self.buffer_ordered_messages_tx
+                    .unbounded_send(BufferOrderedMessage::Operation {
                         buffer_id: buffer.read(cx).remote_id(),
                         operation: language::proto::serialize_operation(operation),
                     })
@@ -1962,19 +2012,24 @@ impl Project {
                                 Duration::from_secs(1);
 
                             let task = cx.spawn_weak(|this, mut cx| async move {
-                            cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx | {
-                                    this.disk_based_diagnostics_finished(language_server_id, cx);
-                                    this.broadcast_language_server_update(
-                                        language_server_id,
-                                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
-                                            proto::LspDiskBasedDiagnosticsUpdated {},
-                                        ),
-                                    );
-                                });
-                            }
-                        });
+                                cx.background().timer(DISK_BASED_DIAGNOSTICS_DEBOUNCE).await;
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |this, cx| {
+                                        this.disk_based_diagnostics_finished(
+                                            language_server_id,
+                                            cx,
+                                        );
+                                        this.buffer_ordered_messages_tx
+                                            .unbounded_send(
+                                                BufferOrderedMessage::LanguageServerUpdate {
+                                                    language_server_id,
+                                                    message:proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(Default::default())
+                                                },
+                                            )
+                                            .ok();
+                                    });
+                                }
+                            });
                             *simulate_disk_based_diagnostics_completion = Some(task);
                         }
                     }
@@ -2609,7 +2664,7 @@ impl Project {
     fn on_lsp_progress(
         &mut self,
         progress: lsp::ProgressParams,
-        server_id: LanguageServerId,
+        language_server_id: LanguageServerId,
         disk_based_diagnostics_progress_token: Option<String>,
         cx: &mut ModelContext<Self>,
     ) {
@@ -2622,7 +2677,7 @@ impl Project {
         };
         let lsp::ProgressParamsValue::WorkDone(progress) = progress.value;
         let language_server_status =
-            if let Some(status) = self.language_server_statuses.get_mut(&server_id) {
+            if let Some(status) = self.language_server_statuses.get_mut(&language_server_id) {
                 status
             } else {
                 return;
@@ -2642,16 +2697,16 @@ impl Project {
             lsp::WorkDoneProgress::Begin(report) => {
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = true;
-                    self.disk_based_diagnostics_started(server_id, cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(
-                            proto::LspDiskBasedDiagnosticsUpdating {},
-                        ),
-                    );
+                    self.disk_based_diagnostics_started(language_server_id, cx);
+                    self.buffer_ordered_messages_tx
+                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::DiskBasedDiagnosticsUpdating(Default::default())
+                        })
+                        .ok();
                 } else {
                     self.on_lsp_work_start(
-                        server_id,
+                        language_server_id,
                         token.clone(),
                         LanguageServerProgress {
                             message: report.message.clone(),
@@ -2660,20 +2715,24 @@ impl Project {
                         },
                         cx,
                     );
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkStart(proto::LspWorkStart {
-                            token,
-                            message: report.message,
-                            percentage: report.percentage.map(|p| p as u32),
-                        }),
-                    );
+                    self.buffer_ordered_messages_tx
+                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkStart(
+                                proto::LspWorkStart {
+                                    token,
+                                    message: report.message,
+                                    percentage: report.percentage.map(|p| p as u32),
+                                },
+                            ),
+                        })
+                        .ok();
                 }
             }
             lsp::WorkDoneProgress::Report(report) => {
                 if !is_disk_based_diagnostics_progress {
                     self.on_lsp_work_progress(
-                        server_id,
+                        language_server_id,
                         token.clone(),
                         LanguageServerProgress {
                             message: report.message.clone(),
@@ -2682,16 +2741,18 @@ impl Project {
                         },
                         cx,
                     );
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkProgress(
-                            proto::LspWorkProgress {
-                                token,
-                                message: report.message,
-                                percentage: report.percentage.map(|p| p as u32),
-                            },
-                        ),
-                    );
+                    self.buffer_ordered_messages_tx
+                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkProgress(
+                                proto::LspWorkProgress {
+                                    token,
+                                    message: report.message,
+                                    percentage: report.percentage.map(|p| p as u32),
+                                },
+                            ),
+                        })
+                        .ok();
                 }
             }
             lsp::WorkDoneProgress::End(_) => {
@@ -2699,21 +2760,26 @@ impl Project {
 
                 if is_disk_based_diagnostics_progress {
                     language_server_status.has_pending_diagnostic_updates = false;
-                    self.disk_based_diagnostics_finished(server_id, cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
-                            proto::LspDiskBasedDiagnosticsUpdated {},
-                        ),
-                    );
+                    self.disk_based_diagnostics_finished(language_server_id, cx);
+                    self.buffer_ordered_messages_tx
+                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message:
+                                proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated(
+                                    Default::default(),
+                                ),
+                        })
+                        .ok();
                 } else {
-                    self.on_lsp_work_end(server_id, token.clone(), cx);
-                    self.broadcast_language_server_update(
-                        server_id,
-                        proto::update_language_server::Variant::WorkEnd(proto::LspWorkEnd {
-                            token,
-                        }),
-                    );
+                    self.on_lsp_work_end(language_server_id, token.clone(), cx);
+                    self.buffer_ordered_messages_tx
+                        .unbounded_send(BufferOrderedMessage::LanguageServerUpdate {
+                            language_server_id,
+                            message: proto::update_language_server::Variant::WorkEnd(
+                                proto::LspWorkEnd { token },
+                            ),
+                        })
+                        .ok();
                 }
             }
         }
@@ -2822,22 +2888,6 @@ impl Project {
         })
     }
 
-    fn broadcast_language_server_update(
-        &self,
-        language_server_id: LanguageServerId,
-        event: proto::update_language_server::Variant,
-    ) {
-        if let Some(project_id) = self.remote_id() {
-            self.client
-                .send(proto::UpdateLanguageServer {
-                    project_id,
-                    language_server_id: language_server_id.0 as u64,
-                    variant: Some(event),
-                })
-                .log_err();
-        }
-    }
-
     pub fn language_server_statuses(
         &self,
     ) -> impl DoubleEndedIterator<Item = &LanguageServerStatus> {
@@ -4866,8 +4916,8 @@ impl Project {
             if is_host {
                 this.opened_buffers
                     .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
-                this.buffer_changes_tx
-                    .unbounded_send(BufferMessage::Resync)
+                this.buffer_ordered_messages_tx
+                    .unbounded_send(BufferOrderedMessage::Resync)
                     .unwrap();
             }