Fix race conditions and bugs in Buffer::reload

Max Brunsfeld created

Change summary

crates/language2/src/buffer.rs       | 90 ++++++++++++++++-------------
crates/project2/src/project2.rs      |  4 
crates/project2/src/project_tests.rs | 64 +++++++++++++++++++++
crates/project2/src/worktree.rs      |  3 
crates/rope2/src/rope2.rs            |  6 +
5 files changed, 123 insertions(+), 44 deletions(-)

Detailed changes

crates/language2/src/buffer.rs 🔗

@@ -16,7 +16,7 @@ use crate::{
 };
 use anyhow::{anyhow, Result};
 pub use clock::ReplicaId;
-use futures::FutureExt as _;
+use futures::channel::oneshot;
 use gpui::{AppContext, EventEmitter, HighlightStyle, ModelContext, Task, TaskLabel};
 use lazy_static::lazy_static;
 use lsp::LanguageServerId;
@@ -45,7 +45,7 @@ pub use text::{Buffer as TextBuffer, BufferSnapshot as TextBufferSnapshot, *};
 use theme::SyntaxTheme;
 #[cfg(any(test, feature = "test-support"))]
 use util::RandomCharIter;
-use util::{RangeExt, TryFutureExt as _};
+use util::RangeExt;
 
 #[cfg(any(test, feature = "test-support"))]
 pub use {tree_sitter_rust, tree_sitter_typescript};
@@ -66,6 +66,7 @@ pub struct Buffer {
     saved_mtime: SystemTime,
     transaction_depth: usize,
     was_dirty_before_starting_transaction: Option<bool>,
+    reload_task: Option<Task<Result<()>>>,
     language: Option<Arc<Language>>,
     autoindent_requests: Vec<Arc<AutoindentRequest>>,
     pending_autoindent: Option<Task<()>>,
@@ -473,6 +474,7 @@ impl Buffer {
             saved_mtime,
             saved_version: buffer.version(),
             saved_version_fingerprint: buffer.as_rope().fingerprint(),
+            reload_task: None,
             transaction_depth: 0,
             was_dirty_before_starting_transaction: None,
             text: buffer,
@@ -572,37 +574,52 @@ impl Buffer {
         cx.notify();
     }
 
-    pub fn reload(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Option<Transaction>>> {
-        cx.spawn(|this, mut cx| async move {
-            if let Some((new_mtime, new_text)) = this.update(&mut cx, |this, cx| {
+    pub fn reload(
+        &mut self,
+        cx: &mut ModelContext<Self>,
+    ) -> oneshot::Receiver<Option<Transaction>> {
+        let (tx, rx) = futures::channel::oneshot::channel();
+        let prev_version = self.text.version();
+        self.reload_task = Some(cx.spawn(|this, mut cx| async move {
+            let Some((new_mtime, new_text)) = this.update(&mut cx, |this, cx| {
                 let file = this.file.as_ref()?.as_local()?;
                 Some((file.mtime(), file.load(cx)))
-            })? {
-                let new_text = new_text.await?;
-                let diff = this
-                    .update(&mut cx, |this, cx| this.diff(new_text, cx))?
-                    .await;
-                this.update(&mut cx, |this, cx| {
-                    if this.version() == diff.base_version {
-                        this.finalize_last_transaction();
-                        this.apply_diff(diff, cx);
-                        if let Some(transaction) = this.finalize_last_transaction().cloned() {
-                            this.did_reload(
-                                this.version(),
-                                this.as_rope().fingerprint(),
-                                this.line_ending(),
-                                new_mtime,
-                                cx,
-                            );
-                            return Some(transaction);
-                        }
-                    }
-                    None
-                })
-            } else {
-                Ok(None)
-            }
-        })
+            })?
+            else {
+                return Ok(());
+            };
+
+            let new_text = new_text.await?;
+            let diff = this
+                .update(&mut cx, |this, cx| this.diff(new_text.clone(), cx))?
+                .await;
+            this.update(&mut cx, |this, cx| {
+                if this.version() == diff.base_version {
+                    this.finalize_last_transaction();
+                    this.apply_diff(diff, cx);
+                    tx.send(this.finalize_last_transaction().cloned()).ok();
+
+                    this.did_reload(
+                        this.version(),
+                        this.as_rope().fingerprint(),
+                        this.line_ending(),
+                        new_mtime,
+                        cx,
+                    );
+                } else {
+                    this.did_reload(
+                        prev_version,
+                        Rope::text_fingerprint(&new_text),
+                        this.line_ending(),
+                        new_mtime,
+                        cx,
+                    );
+                }
+
+                this.reload_task.take();
+            })
+        }));
+        rx
     }
 
     pub fn did_reload(
@@ -631,13 +648,8 @@ impl Buffer {
         cx.notify();
     }
 
-    pub fn file_updated(
-        &mut self,
-        new_file: Arc<dyn File>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<()> {
+    pub fn file_updated(&mut self, new_file: Arc<dyn File>, cx: &mut ModelContext<Self>) {
         let mut file_changed = false;
-        let mut task = Task::ready(());
 
         if let Some(old_file) = self.file.as_ref() {
             if new_file.path() != old_file.path() {
@@ -657,8 +669,7 @@ impl Buffer {
                     file_changed = true;
 
                     if !self.is_dirty() {
-                        let reload = self.reload(cx).log_err().map(drop);
-                        task = cx.background_executor().spawn(reload);
+                        self.reload(cx).close();
                     }
                 }
             }
@@ -672,7 +683,6 @@ impl Buffer {
             cx.emit(Event::FileHandleChanged);
             cx.notify();
         }
-        task
     }
 
     pub fn diff_base(&self) -> Option<&str> {

crates/project2/src/project2.rs 🔗

@@ -6262,7 +6262,7 @@ impl Project {
                                 .log_err();
                         }
 
-                        buffer.file_updated(Arc::new(new_file), cx).detach();
+                        buffer.file_updated(Arc::new(new_file), cx);
                     }
                 }
             });
@@ -7256,7 +7256,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("no such worktree"))?;
                 let file = File::from_proto(file, worktree, cx)?;
                 buffer.update(cx, |buffer, cx| {
-                    buffer.file_updated(Arc::new(file), cx).detach();
+                    buffer.file_updated(Arc::new(file), cx);
                 });
                 this.detect_language_for_buffer(&buffer, cx);
             }

crates/project2/src/project_tests.rs 🔗

@@ -2587,6 +2587,70 @@ async fn test_save_file(cx: &mut gpui::TestAppContext) {
     assert_eq!(new_text, buffer.update(cx, |buffer, _| buffer.text()));
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_file_changes_multiple_times_on_disk(cx: &mut gpui::TestAppContext) {
+    init_test(cx);
+
+    let fs = FakeFs::new(cx.executor().clone());
+    fs.insert_tree(
+        "/dir",
+        json!({
+            "file1": "the original contents",
+        }),
+    )
+    .await;
+
+    let project = Project::test(fs.clone(), ["/dir".as_ref()], cx).await;
+    let worktree = project.read_with(cx, |project, _| project.worktrees().next().unwrap());
+    let buffer = project
+        .update(cx, |p, cx| p.open_local_buffer("/dir/file1", cx))
+        .await
+        .unwrap();
+
+    // Simulate buffer diffs being slow, so that they don't complete before
+    // the next file change occurs.
+    cx.executor().deprioritize_task(*language::BUFFER_DIFF_TASK);
+
+    // Change the buffer's file on disk, and then wait for the file change
+    // to be detected by the worktree, so that the buffer starts reloading.
+    fs.save(
+        "/dir/file1".as_ref(),
+        &"the first contents".into(),
+        Default::default(),
+    )
+    .await
+    .unwrap();
+    worktree.next_event(cx);
+
+    // Change the buffer's file again. Depending on the random seed, the
+    // previous file change may still be in progress.
+    fs.save(
+        "/dir/file1".as_ref(),
+        &"the second contents".into(),
+        Default::default(),
+    )
+    .await
+    .unwrap();
+    worktree.next_event(cx);
+
+    cx.executor().run_until_parked();
+    let on_disk_text = fs.load(Path::new("/dir/file1")).await.unwrap();
+    buffer.read_with(cx, |buffer, _| {
+        let buffer_text = buffer.text();
+        if buffer_text == on_disk_text {
+            assert!(!buffer.is_dirty(), "buffer shouldn't be dirty. text: {buffer_text:?}, disk text: {on_disk_text:?}");
+        }
+        // If the file change occurred while the buffer was processing the first
+        // change, the buffer may be in a conflicting state.
+        else {
+            assert!(
+                buffer.is_dirty(),
+                "buffer should report that it is dirty. text: {buffer_text:?}, disk text: {on_disk_text:?}"
+            );
+        }
+    });
+}
+
 #[gpui::test]
 async fn test_save_in_single_file_worktree(cx: &mut gpui::TestAppContext) {
     init_test(cx);

crates/project2/src/worktree.rs 🔗

@@ -276,6 +276,7 @@ struct ShareState {
     _maintain_remote_snapshot: Task<Option<()>>,
 }
 
+#[derive(Clone)]
 pub enum Event {
     UpdatedEntries(UpdatedEntriesSet),
     UpdatedGitRepositories(UpdatedGitRepositoriesSet),
@@ -961,7 +962,7 @@ impl LocalWorktree {
 
                 buffer_handle.update(&mut cx, |buffer, cx| {
                     if has_changed_file {
-                        buffer.file_updated(new_file, cx).detach();
+                        buffer.file_updated(new_file, cx);
                     }
                 })?;
             }

crates/rope2/src/rope2.rs 🔗

@@ -41,6 +41,10 @@ impl Rope {
         Self::default()
     }
 
+    pub fn text_fingerprint(text: &str) -> RopeFingerprint {
+        bromberg_sl2::hash_strict(text.as_bytes())
+    }
+
     pub fn append(&mut self, rope: Rope) {
         let mut chunks = rope.chunks.cursor::<()>();
         chunks.next(&());
@@ -931,7 +935,7 @@ impl<'a> From<&'a str> for ChunkSummary {
     fn from(text: &'a str) -> Self {
         Self {
             text: TextSummary::from(text),
-            fingerprint: bromberg_sl2::hash_strict(text.as_bytes()),
+            fingerprint: Rope::text_fingerprint(text),
         }
     }
 }