Store an Fs on Workspace, pass it to each Worktree

Max Brunsfeld and Nathan Sobo created

Push test-only worktree behavior down into the Fs,
via a `watch` method which provides fs events.

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

server/src/tests.rs      |  19 +-
zed/src/editor/buffer.rs |   8 
zed/src/file_finder.rs   |  31 ----
zed/src/lib.rs           |   1 
zed/src/main.rs          |   4 
zed/src/test.rs          |   5 
zed/src/workspace.rs     | 124 ++++++-------------
zed/src/worktree.rs      | 265 ++++++++++++++++++++++-------------------
8 files changed, 213 insertions(+), 244 deletions(-)

Detailed changes

server/src/tests.rs 🔗

@@ -19,7 +19,7 @@ use zed::{
     rpc::Client,
     settings,
     test::{temp_tree, Channel},
-    worktree::{Fs, InMemoryFs, Worktree},
+    worktree::{FakeFs, Fs, RealFs, Worktree},
 };
 use zed_rpc::{ForegroundRouter, Peer, Router};
 
@@ -39,7 +39,8 @@ async fn test_share_worktree(mut cx_a: TestAppContext, mut cx_b: TestAppContext)
         "a.txt": "a-contents",
         "b.txt": "b-contents",
     }));
-    let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx));
+    let worktree_a = cx_a
+        .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx));
     worktree_a
         .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
         .await;
@@ -133,7 +134,8 @@ async fn test_propagate_saves_and_fs_changes_in_shared_worktree(
         "file1": "",
         "file2": ""
     }));
-    let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx));
+    let worktree_a = cx_a
+        .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx));
     worktree_a
         .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
         .await;
@@ -243,12 +245,12 @@ async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: Tes
     let client_b = server.create_client(&mut cx_b, "user_b").await;
 
     // Share a local worktree as client A
-    let fs = Arc::new(InMemoryFs::new());
+    let fs = Arc::new(FakeFs::new());
     fs.save(Path::new("/a.txt"), &"a-contents".into())
         .await
         .unwrap();
     let worktree_a =
-        cx_a.add_model(|cx| Worktree::test(Path::new("/"), lang_registry.clone(), fs.clone(), cx));
+        cx_a.add_model(|cx| Worktree::local(Path::new("/"), lang_registry.clone(), fs.clone(), cx));
     worktree_a
         .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
         .await;
@@ -314,12 +316,12 @@ async fn test_editing_while_guest_opens_buffer(mut cx_a: TestAppContext, mut cx_
     let client_b = server.create_client(&mut cx_b, "user_b").await;
 
     // Share a local worktree as client A
-    let fs = Arc::new(InMemoryFs::new());
+    let fs = Arc::new(FakeFs::new());
     fs.save(Path::new("/a.txt"), &"a-contents".into())
         .await
         .unwrap();
     let worktree_a =
-        cx_a.add_model(|cx| Worktree::test(Path::new("/"), lang_registry.clone(), fs.clone(), cx));
+        cx_a.add_model(|cx| Worktree::local(Path::new("/"), lang_registry.clone(), fs.clone(), cx));
     worktree_a
         .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
         .await;
@@ -371,7 +373,8 @@ async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext)
         "a.txt": "a-contents",
         "b.txt": "b-contents",
     }));
-    let worktree_a = cx_a.add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), cx));
+    let worktree_a = cx_a
+        .add_model(|cx| Worktree::local(dir.path(), lang_registry.clone(), Arc::new(RealFs), cx));
     worktree_a
         .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
         .await;

zed/src/editor/buffer.rs 🔗

@@ -2734,7 +2734,7 @@ mod tests {
     use crate::{
         test::{build_app_state, temp_tree},
         util::RandomCharIter,
-        worktree::{Worktree, WorktreeHandle},
+        worktree::{RealFs, Worktree, WorktreeHandle},
     };
     use gpui::ModelHandle;
     use rand::prelude::*;
@@ -3209,7 +3209,8 @@ mod tests {
             "file2": "def",
             "file3": "ghi",
         }));
-        let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
+        let tree = cx
+            .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx));
         tree.flush_fs_events(&cx).await;
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
@@ -3321,7 +3322,8 @@ mod tests {
     async fn test_file_changes_on_disk(mut cx: gpui::TestAppContext) {
         let initial_contents = "aaa\nbbbbb\nc\n";
         let dir = temp_tree(json!({ "the-file": initial_contents }));
-        let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
+        let tree = cx
+            .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx));
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
 

zed/src/file_finder.rs 🔗

@@ -479,12 +479,7 @@ mod tests {
 
         let app_state = cx.read(build_app_state);
         let (window_id, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(tmp_dir.path(), cx);
             workspace
         });
@@ -551,12 +546,7 @@ mod tests {
         }));
         let app_state = cx.read(build_app_state);
         let (_, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(tmp_dir.path(), cx);
             workspace
         });
@@ -614,12 +604,7 @@ mod tests {
 
         let app_state = cx.read(build_app_state);
         let (_, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(&file_path, cx);
             workspace
         });
@@ -663,15 +648,7 @@ mod tests {
         }));
 
         let app_state = cx.read(build_app_state);
-
-        let (_, workspace) = cx.add_window(|cx| {
-            Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            )
-        });
+        let (_, workspace) = cx.add_window(|cx| Workspace::new(&app_state, cx));
 
         workspace
             .update(&mut cx, |workspace, cx| {

zed/src/lib.rs 🔗

@@ -21,6 +21,7 @@ pub struct AppState {
     pub languages: std::sync::Arc<language::LanguageRegistry>,
     pub rpc_router: std::sync::Arc<ForegroundRouter>,
     pub rpc: rpc::Client,
+    pub fs: std::sync::Arc<dyn worktree::Fs>,
 }
 
 pub fn init(cx: &mut gpui::MutableAppContext) {

zed/src/main.rs 🔗

@@ -8,7 +8,8 @@ use std::{fs, path::PathBuf, sync::Arc};
 use zed::{
     self, assets, editor, file_finder, language, menus, rpc, settings,
     workspace::{self, OpenParams},
-    worktree, AppState,
+    worktree::{self, RealFs},
+    AppState,
 };
 use zed_rpc::ForegroundRouter;
 
@@ -26,6 +27,7 @@ fn main() {
         settings,
         rpc_router: Arc::new(ForegroundRouter::new()),
         rpc: rpc::Client::new(languages),
+        fs: Arc::new(RealFs),
     };
 
     app.run(move |cx| {

zed/src/test.rs 🔗

@@ -1,4 +1,6 @@
-use crate::{language::LanguageRegistry, rpc, settings, time::ReplicaId, AppState};
+use crate::{
+    language::LanguageRegistry, rpc, settings, time::ReplicaId, worktree::RealFs, AppState,
+};
 use gpui::AppContext;
 use std::{
     path::{Path, PathBuf},
@@ -152,5 +154,6 @@ pub fn build_app_state(cx: &AppContext) -> Arc<AppState> {
         languages: languages.clone(),
         rpc_router: Arc::new(ForegroundRouter::new()),
         rpc: rpc::Client::new(languages),
+        fs: Arc::new(RealFs),
     })
 }

zed/src/workspace.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
     language::LanguageRegistry,
     rpc,
     settings::Settings,
-    worktree::{File, Worktree},
+    worktree::{File, Fs, Worktree},
     AppState,
 };
 use anyhow::{anyhow, Result};
@@ -90,12 +90,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
 
     // Add a new workspace if necessary
     cx.add_window(|cx| {
-        let mut view = Workspace::new(
-            params.app_state.settings.clone(),
-            params.app_state.languages.clone(),
-            params.app_state.rpc.clone(),
-            cx,
-        );
+        let mut view = Workspace::new(&params.app_state, cx);
         let open_paths = view.open_paths(&params.paths, cx);
         cx.foreground().spawn(open_paths).detach();
         view
@@ -104,12 +99,7 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
 
 fn open_new(app_state: &Arc<AppState>, cx: &mut MutableAppContext) {
     cx.add_window(|cx| {
-        let mut view = Workspace::new(
-            app_state.settings.clone(),
-            app_state.languages.clone(),
-            app_state.rpc.clone(),
-            cx,
-        );
+        let mut view = Workspace::new(app_state.as_ref(), cx);
         view.open_new_file(&app_state, cx);
         view
     });
@@ -117,12 +107,7 @@ fn open_new(app_state: &Arc<AppState>, cx: &mut MutableAppContext) {
 
 fn join_worktree(app_state: &Arc<AppState>, cx: &mut MutableAppContext) {
     cx.add_window(|cx| {
-        let mut view = Workspace::new(
-            app_state.settings.clone(),
-            app_state.languages.clone(),
-            app_state.rpc.clone(),
-            cx,
-        );
+        let mut view = Workspace::new(app_state.as_ref(), cx);
         view.join_worktree(&app_state, cx);
         view
     });
@@ -328,6 +313,7 @@ pub struct Workspace {
     pub settings: watch::Receiver<Settings>,
     languages: Arc<LanguageRegistry>,
     rpc: rpc::Client,
+    fs: Arc<dyn Fs>,
     modal: Option<AnyViewHandle>,
     center: PaneGroup,
     panes: Vec<ViewHandle<Pane>>,
@@ -341,13 +327,8 @@ pub struct Workspace {
 }
 
 impl Workspace {
-    pub fn new(
-        settings: watch::Receiver<Settings>,
-        languages: Arc<LanguageRegistry>,
-        rpc: rpc::Client,
-        cx: &mut ViewContext<Self>,
-    ) -> Self {
-        let pane = cx.add_view(|_| Pane::new(settings.clone()));
+    pub fn new(app_state: &AppState, cx: &mut ViewContext<Self>) -> Self {
+        let pane = cx.add_view(|_| Pane::new(app_state.settings.clone()));
         let pane_id = pane.id();
         cx.subscribe_to_view(&pane, move |me, _, event, cx| {
             me.handle_pane_event(pane_id, event, cx)
@@ -359,9 +340,10 @@ impl Workspace {
             center: PaneGroup::new(pane.id()),
             panes: vec![pane.clone()],
             active_pane: pane.clone(),
-            settings,
-            languages: languages,
-            rpc,
+            settings: app_state.settings.clone(),
+            languages: app_state.languages.clone(),
+            rpc: app_state.rpc.clone(),
+            fs: app_state.fs.clone(),
             worktrees: Default::default(),
             items: Default::default(),
             loading_items: Default::default(),
@@ -411,18 +393,20 @@ impl Workspace {
             .map(|path| self.entry_id_for_path(&path, cx))
             .collect::<Vec<_>>();
 
-        let bg = cx.background_executor().clone();
+        let fs = self.fs.clone();
         let tasks = abs_paths
             .iter()
             .cloned()
             .zip(entries.into_iter())
             .map(|(abs_path, entry_id)| {
-                let is_file = bg.spawn(async move { abs_path.is_file() });
-                cx.spawn(|this, mut cx| async move {
-                    if is_file.await {
-                        return this.update(&mut cx, |this, cx| this.open_entry(entry_id, cx));
-                    } else {
-                        None
+                cx.spawn(|this, mut cx| {
+                    let fs = fs.clone();
+                    async move {
+                        if fs.is_file(&abs_path).await {
+                            return this.update(&mut cx, |this, cx| this.open_entry(entry_id, cx));
+                        } else {
+                            None
+                        }
                     }
                 })
             })
@@ -476,7 +460,8 @@ impl Workspace {
         path: &Path,
         cx: &mut ViewContext<Self>,
     ) -> ModelHandle<Worktree> {
-        let worktree = cx.add_model(|cx| Worktree::local(path, self.languages.clone(), cx));
+        let worktree =
+            cx.add_model(|cx| Worktree::local(path, self.languages.clone(), self.fs.clone(), cx));
         cx.observe_model(&worktree, |_, _, cx| cx.notify());
         self.worktrees.insert(worktree.clone());
         cx.notify();
@@ -912,7 +897,7 @@ mod tests {
     use crate::{
         editor::Editor,
         test::{build_app_state, temp_tree},
-        worktree::WorktreeHandle,
+        worktree::{FakeFs, WorktreeHandle},
     };
     use serde_json::json;
     use std::{collections::HashSet, fs};
@@ -990,12 +975,7 @@ mod tests {
         let app_state = cx.read(build_app_state);
 
         let (_, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(dir.path(), cx);
             workspace
         });
@@ -1088,22 +1068,18 @@ mod tests {
 
     #[gpui::test]
     async fn test_open_paths(mut cx: gpui::TestAppContext) {
-        let dir1 = temp_tree(json!({
-            "a.txt": "",
-        }));
-        let dir2 = temp_tree(json!({
-            "b.txt": "",
-        }));
+        let fs = FakeFs::new();
+        fs.insert_dir("/dir1").await.unwrap();
+        fs.insert_dir("/dir2").await.unwrap();
+        fs.insert_file("/dir1/a.txt", "".into()).await.unwrap();
+        fs.insert_file("/dir2/b.txt", "".into()).await.unwrap();
+
+        let mut app_state = cx.read(build_app_state);
+        Arc::get_mut(&mut app_state).unwrap().fs = Arc::new(fs);
 
-        let app_state = cx.read(build_app_state);
         let (_, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
-            workspace.add_worktree(dir1.path(), cx);
+            let mut workspace = Workspace::new(&app_state, cx);
+            workspace.add_worktree("/dir1".as_ref(), cx);
             workspace
         });
         cx.read(|cx| workspace.read(cx).worktree_scans_complete(cx))
@@ -1111,9 +1087,7 @@ mod tests {
 
         // Open a file within an existing worktree.
         cx.update(|cx| {
-            workspace.update(cx, |view, cx| {
-                view.open_paths(&[dir1.path().join("a.txt")], cx)
-            })
+            workspace.update(cx, |view, cx| view.open_paths(&["/dir1/a.txt".into()], cx))
         })
         .await;
         cx.read(|cx| {
@@ -1131,9 +1105,7 @@ mod tests {
 
         // Open a file outside of any existing worktree.
         cx.update(|cx| {
-            workspace.update(cx, |view, cx| {
-                view.open_paths(&[dir2.path().join("b.txt")], cx)
-            })
+            workspace.update(cx, |view, cx| view.open_paths(&["/dir2/b.txt".into()], cx))
         })
         .await;
         cx.read(|cx| {
@@ -1145,8 +1117,9 @@ mod tests {
                 .collect::<HashSet<_>>();
             assert_eq!(
                 worktree_roots,
-                vec![dir1.path(), &dir2.path().join("b.txt")]
+                vec!["/dir1", "/dir2/b.txt"]
                     .into_iter()
+                    .map(Path::new)
                     .collect(),
             );
             assert_eq!(
@@ -1170,12 +1143,7 @@ mod tests {
 
         let app_state = cx.read(build_app_state);
         let (window_id, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(dir.path(), cx);
             workspace
         });
@@ -1218,12 +1186,7 @@ mod tests {
         let dir = TempDir::new("test-new-file").unwrap();
         let app_state = cx.read(build_app_state);
         let (_, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(dir.path(), cx);
             workspace
         });
@@ -1343,12 +1306,7 @@ mod tests {
 
         let app_state = cx.read(build_app_state);
         let (window_id, workspace) = cx.add_window(|cx| {
-            let mut workspace = Workspace::new(
-                app_state.settings.clone(),
-                app_state.languages.clone(),
-                app_state.rpc.clone(),
-                cx,
-            );
+            let mut workspace = Workspace::new(&app_state, cx);
             workspace.add_worktree(dir.path(), cx);
             workspace
         });

zed/src/worktree.rs 🔗

@@ -14,6 +14,7 @@ use crate::{
 use ::ignore::gitignore::Gitignore;
 use anyhow::{anyhow, Context, Result};
 use atomic::Ordering::SeqCst;
+use fsevent::EventStream;
 use futures::{Stream, StreamExt};
 pub use fuzzy::{match_paths, PathMatch};
 use gpui::{
@@ -84,12 +85,19 @@ pub trait Fs: Send + Sync {
     async fn load(&self, path: &Path) -> Result<String>;
     async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
     async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
+    async fn is_file(&self, path: &Path) -> bool;
+    fn watch(
+        &self,
+        path: &Path,
+        latency: Duration,
+    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
+    fn is_fake(&self) -> bool;
 }
 
-struct ProductionFs;
+pub struct RealFs;
 
 #[async_trait::async_trait]
-impl Fs for ProductionFs {
+impl Fs for RealFs {
     async fn entry(
         &self,
         root_char_bag: CharBag,
@@ -188,10 +196,34 @@ impl Fs for ProductionFs {
     async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
         Ok(smol::fs::canonicalize(path).await?)
     }
+
+    async fn is_file(&self, path: &Path) -> bool {
+        smol::fs::metadata(path)
+            .await
+            .map_or(false, |metadata| metadata.is_file())
+    }
+
+    fn watch(
+        &self,
+        path: &Path,
+        latency: Duration,
+    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
+        let (mut tx, rx) = postage::mpsc::channel(64);
+        let (stream, handle) = EventStream::new(&[path], latency);
+        std::mem::forget(handle);
+        std::thread::spawn(move || {
+            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
+        });
+        Box::pin(rx)
+    }
+
+    fn is_fake(&self) -> bool {
+        false
+    }
 }
 
 #[derive(Clone, Debug)]
-struct InMemoryEntry {
+struct FakeFsEntry {
     inode: u64,
     mtime: SystemTime,
     is_dir: bool,
@@ -200,14 +232,14 @@ struct InMemoryEntry {
 }
 
 #[cfg(any(test, feature = "test-support"))]
-struct InMemoryFsState {
-    entries: std::collections::BTreeMap<PathBuf, InMemoryEntry>,
+struct FakeFsState {
+    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
     next_inode: u64,
-    events_tx: postage::broadcast::Sender<fsevent::Event>,
+    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
 }
 
 #[cfg(any(test, feature = "test-support"))]
-impl InMemoryFsState {
+impl FakeFsState {
     fn validate_path(&self, path: &Path) -> Result<()> {
         if path.is_absolute()
             && path
@@ -221,31 +253,33 @@ impl InMemoryFsState {
         }
     }
 
-    async fn emit_event(&mut self, path: &Path) {
-        let _ = self
-            .events_tx
-            .send(fsevent::Event {
+    async fn emit_event(&mut self, paths: &[&Path]) {
+        let events = paths
+            .iter()
+            .map(|path| fsevent::Event {
                 event_id: 0,
                 flags: fsevent::StreamFlags::empty(),
                 path: path.to_path_buf(),
             })
-            .await;
+            .collect();
+
+        let _ = self.events_tx.send(events).await;
     }
 }
 
 #[cfg(any(test, feature = "test-support"))]
-pub struct InMemoryFs {
-    state: smol::lock::RwLock<InMemoryFsState>,
+pub struct FakeFs {
+    state: smol::lock::RwLock<FakeFsState>,
 }
 
 #[cfg(any(test, feature = "test-support"))]
-impl InMemoryFs {
+impl FakeFs {
     pub fn new() -> Self {
         let (events_tx, _) = postage::broadcast::channel(2048);
         let mut entries = std::collections::BTreeMap::new();
         entries.insert(
             Path::new("/").to_path_buf(),
-            InMemoryEntry {
+            FakeFsEntry {
                 inode: 0,
                 mtime: SystemTime::now(),
                 is_dir: true,
@@ -254,7 +288,7 @@ impl InMemoryFs {
             },
         );
         Self {
-            state: smol::lock::RwLock::new(InMemoryFsState {
+            state: smol::lock::RwLock::new(FakeFsState {
                 entries,
                 next_inode: 1,
                 events_tx,
@@ -262,15 +296,16 @@ impl InMemoryFs {
         }
     }
 
-    pub async fn insert_dir(&self, path: &Path) -> Result<()> {
+    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
         let mut state = self.state.write().await;
+        let path = path.as_ref();
         state.validate_path(path)?;
 
         let inode = state.next_inode;
         state.next_inode += 1;
         state.entries.insert(
             path.to_path_buf(),
-            InMemoryEntry {
+            FakeFsEntry {
                 inode,
                 mtime: SystemTime::now(),
                 is_dir: true,
@@ -278,7 +313,28 @@ impl InMemoryFs {
                 content: None,
             },
         );
-        state.emit_event(path).await;
+        state.emit_event(&[path]).await;
+        Ok(())
+    }
+
+    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
+        let mut state = self.state.write().await;
+        let path = path.as_ref();
+        state.validate_path(path)?;
+
+        let inode = state.next_inode;
+        state.next_inode += 1;
+        state.entries.insert(
+            path.to_path_buf(),
+            FakeFsEntry {
+                inode,
+                mtime: SystemTime::now(),
+                is_dir: false,
+                is_symlink: false,
+                content: Some(content),
+            },
+        );
+        state.emit_event(&[path]).await;
         Ok(())
     }
 
@@ -286,7 +342,7 @@ impl InMemoryFs {
         let mut state = self.state.write().await;
         state.validate_path(path)?;
         state.entries.retain(|path, _| !path.starts_with(path));
-        state.emit_event(&path).await;
+        state.emit_event(&[path]).await;
         Ok(())
     }
 
@@ -312,21 +368,15 @@ impl InMemoryFs {
                 state.entries.insert(new_path, entry);
             }
 
-            state.emit_event(source).await;
-            state.emit_event(target).await;
-
+            state.emit_event(&[source, target]).await;
             Ok(())
         }
     }
-
-    pub async fn events(&self) -> postage::broadcast::Receiver<fsevent::Event> {
-        self.state.read().await.events_tx.subscribe()
-    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
 #[async_trait::async_trait]
-impl Fs for InMemoryFs {
+impl Fs for FakeFs {
     async fn entry(
         &self,
         root_char_bag: CharBag,
@@ -405,13 +455,13 @@ impl Fs for InMemoryFs {
             } else {
                 entry.content = Some(text.chunks().collect());
                 entry.mtime = SystemTime::now();
-                state.emit_event(path).await;
+                state.emit_event(&[path]).await;
                 Ok(())
             }
         } else {
             let inode = state.next_inode;
             state.next_inode += 1;
-            let entry = InMemoryEntry {
+            let entry = FakeFsEntry {
                 inode,
                 mtime: SystemTime::now(),
                 is_dir: false,
@@ -419,7 +469,7 @@ impl Fs for InMemoryFs {
                 content: Some(text.chunks().collect()),
             };
             state.entries.insert(path.to_path_buf(), entry);
-            state.emit_event(path).await;
+            state.emit_event(&[path]).await;
             Ok(())
         }
     }
@@ -427,6 +477,29 @@ impl Fs for InMemoryFs {
     async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
         Ok(path.to_path_buf())
     }
+
+    async fn is_file(&self, path: &Path) -> bool {
+        let state = self.state.read().await;
+        state.entries.get(path).map_or(false, |entry| !entry.is_dir)
+    }
+
+    fn watch(
+        &self,
+        path: &Path,
+        _: Duration,
+    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
+        let state = smol::block_on(self.state.read());
+        let rx = state.events_tx.subscribe();
+        let path = path.to_path_buf();
+        Box::pin(futures::StreamExt::filter(rx, move |events| {
+            let result = events.iter().any(|event| event.path.starts_with(&path));
+            async move { result }
+        }))
+    }
+
+    fn is_fake(&self) -> bool {
+        true
+    }
 }
 
 #[derive(Clone, Debug)]
@@ -470,49 +543,22 @@ impl Worktree {
     pub fn local(
         path: impl Into<Arc<Path>>,
         languages: Arc<LanguageRegistry>,
+        fs: Arc<dyn Fs>,
         cx: &mut ModelContext<Worktree>,
     ) -> Self {
-        let fs = Arc::new(ProductionFs);
-        let (mut tree, scan_states_tx) =
-            LocalWorktree::new(path, languages, fs.clone(), Duration::from_millis(100), cx);
-        let (event_stream, event_stream_handle) = fsevent::EventStream::new(
-            &[tree.snapshot.abs_path.as_ref()],
-            Duration::from_millis(100),
-        );
+        let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx);
+
+        let events = fs.watch(tree.snapshot.abs_path.as_ref(), Duration::from_millis(100));
         let background_snapshot = tree.background_snapshot.clone();
-        std::thread::spawn(move || {
+        tree._background_scanner_task = Some(cx.background().spawn(async move {
             let scanner = BackgroundScanner::new(
                 background_snapshot,
                 scan_states_tx,
                 fs,
                 Arc::new(executor::Background::new()),
             );
-            scanner.run(event_stream);
-        });
-        tree._event_stream_handle = Some(event_stream_handle);
-        Worktree::Local(tree)
-    }
-
-    #[cfg(any(test, feature = "test-support"))]
-    pub fn test(
-        path: impl Into<Arc<Path>>,
-        languages: Arc<LanguageRegistry>,
-        fs: Arc<InMemoryFs>,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Self {
-        let (tree, scan_states_tx) =
-            LocalWorktree::new(path, languages, fs.clone(), Duration::ZERO, cx);
-        let background_snapshot = tree.background_snapshot.clone();
-        let fs = fs.clone();
-        let background = cx.background().clone();
-        cx.background()
-            .spawn(async move {
-                let events_rx = fs.events().await;
-                let scanner =
-                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
-                scanner.run_test(events_rx).await;
-            })
-            .detach();
+            scanner.run(events).await;
+        }));
         Worktree::Local(tree)
     }
 
@@ -831,15 +877,15 @@ impl Worktree {
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
         match self {
             Self::Local(worktree) => {
-                let poll_interval = worktree.poll_interval;
+                let is_fake_fs = worktree.fs.is_fake();
                 worktree.snapshot = worktree.background_snapshot.lock().clone();
                 if worktree.is_scanning() {
                     if !worktree.poll_scheduled {
                         cx.spawn(|this, mut cx| async move {
-                            if poll_interval.is_zero() {
+                            if is_fake_fs {
                                 smol::future::yield_now().await;
                             } else {
-                                smol::Timer::after(poll_interval).await;
+                                smol::Timer::after(Duration::from_millis(100)).await;
                             }
                             this.update(&mut cx, |this, cx| {
                                 this.as_local_mut().unwrap().poll_scheduled = false;
@@ -961,7 +1007,7 @@ pub struct LocalWorktree {
     background_snapshot: Arc<Mutex<Snapshot>>,
     snapshots_to_send_tx: Option<Sender<Snapshot>>,
     last_scan_state_rx: watch::Receiver<ScanState>,
-    _event_stream_handle: Option<fsevent::Handle>,
+    _background_scanner_task: Option<Task<()>>,
     poll_scheduled: bool,
     rpc: Option<(rpc::Client, u64)>,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
@@ -969,7 +1015,6 @@ pub struct LocalWorktree {
     peers: HashMap<PeerId, ReplicaId>,
     languages: Arc<LanguageRegistry>,
     fs: Arc<dyn Fs>,
-    poll_interval: Duration,
 }
 
 impl LocalWorktree {
@@ -977,7 +1022,6 @@ impl LocalWorktree {
         path: impl Into<Arc<Path>>,
         languages: Arc<LanguageRegistry>,
         fs: Arc<dyn Fs>,
-        poll_interval: Duration,
         cx: &mut ModelContext<Worktree>,
     ) -> (Self, Sender<ScanState>) {
         let abs_path = path.into();
@@ -1002,7 +1046,7 @@ impl LocalWorktree {
             background_snapshot: Arc::new(Mutex::new(snapshot)),
             snapshots_to_send_tx: None,
             last_scan_state_rx,
-            _event_stream_handle: None,
+            _background_scanner_task: None,
             poll_scheduled: false,
             open_buffers: Default::default(),
             shared_buffers: Default::default(),
@@ -1010,7 +1054,6 @@ impl LocalWorktree {
             rpc: None,
             languages,
             fs,
-            poll_interval,
         };
 
         cx.spawn_weak(|this, mut cx| async move {
@@ -2158,40 +2201,7 @@ impl BackgroundScanner {
         self.snapshot.lock().clone()
     }
 
-    fn run(mut self, event_stream: fsevent::EventStream) {
-        if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
-            return;
-        }
-
-        if let Err(err) = smol::block_on(self.scan_dirs()) {
-            if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
-                return;
-            }
-        }
-
-        if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
-            return;
-        }
-
-        event_stream.run(move |events| {
-            if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
-                return false;
-            }
-
-            if !smol::block_on(self.process_events(events)) {
-                return false;
-            }
-
-            if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
-                return false;
-            }
-
-            true
-        });
-    }
-
-    #[cfg(any(test, feature = "test-support"))]
-    async fn run_test(mut self, mut events_rx: postage::broadcast::Receiver<fsevent::Event>) {
+    async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
         if self.notify.send(ScanState::Scanning).await.is_err() {
             return;
         }
@@ -2211,12 +2221,8 @@ impl BackgroundScanner {
             return;
         }
 
-        while let Some(event) = events_rx.recv().await {
-            let mut events = vec![event];
-            while let Ok(event) = events_rx.try_recv() {
-                events.push(event);
-            }
-
+        futures::pin_mut!(events_rx);
+        while let Some(events) = events_rx.next().await {
             if self.notify.send(ScanState::Scanning).await.is_err() {
                 break;
             }
@@ -2997,7 +3003,9 @@ mod tests {
         )
         .unwrap();
 
-        let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx));
+        let tree = cx.add_model(|cx| {
+            Worktree::local(root_link_path, Default::default(), Arc::new(RealFs), cx)
+        });
 
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
@@ -3039,7 +3047,14 @@ mod tests {
         let dir = temp_tree(json!({
             "file1": "the old contents",
         }));
-        let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages.clone(), cx));
+        let tree = cx.add_model(|cx| {
+            Worktree::local(
+                dir.path(),
+                app_state.languages.clone(),
+                Arc::new(RealFs),
+                cx,
+            )
+        });
         let buffer = tree
             .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
             .await
@@ -3062,8 +3077,14 @@ mod tests {
         }));
         let file_path = dir.path().join("file1");
 
-        let tree =
-            cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages.clone(), cx));
+        let tree = cx.add_model(|cx| {
+            Worktree::local(
+                file_path.clone(),
+                app_state.languages.clone(),
+                Arc::new(RealFs),
+                cx,
+            )
+        });
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
         cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
@@ -3098,7 +3119,8 @@ mod tests {
             }
         }));
 
-        let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
+        let tree = cx
+            .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx));
 
         let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
             let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
@@ -3245,7 +3267,8 @@ mod tests {
             }
         }));
 
-        let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
+        let tree = cx
+            .add_model(|cx| Worktree::local(dir.path(), Default::default(), Arc::new(RealFs), cx));
         cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
             .await;
         tree.flush_fs_events(&cx).await;
@@ -3313,7 +3336,7 @@ mod tests {
                     next_entry_id: Default::default(),
                 })),
                 notify_tx,
-                Arc::new(ProductionFs),
+                Arc::new(RealFs),
                 Arc::new(gpui::executor::Background::new()),
             );
             smol::block_on(scanner.scan_dirs()).unwrap();