Use `async_broadcast` to emit fake FS events

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock                |  1 
crates/project/Cargo.toml |  1 
crates/project/src/fs.rs  | 77 ++++++++++++++++++++++------------------
3 files changed, 45 insertions(+), 34 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3568,6 +3568,7 @@ version = "0.1.0"
 dependencies = [
  "aho-corasick",
  "anyhow",
+ "async-broadcast",
  "async-trait",
  "client",
  "clock",

crates/project/Cargo.toml 🔗

@@ -28,6 +28,7 @@ sum_tree = { path = "../sum_tree" }
 util = { path = "../util" }
 aho-corasick = "0.7"
 anyhow = "1.0.38"
+async-broadcast = "0.3.4"
 async-trait = "0.1"
 futures = "0.3"
 ignore = "0.4"

crates/project/src/fs.rs 🔗

@@ -225,7 +225,6 @@ struct FakeFsEntry {
 struct FakeFsState {
     entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
     next_inode: u64,
-    event_txs: Vec<smol::channel::Sender<Vec<fsevent::Event>>>,
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -242,26 +241,6 @@ impl FakeFsState {
             Err(anyhow!("invalid path {:?}", path))
         }
     }
-
-    async fn emit_event<I, T>(&mut self, paths: I)
-    where
-        I: IntoIterator<Item = T>,
-        T: Into<PathBuf>,
-    {
-        let events = paths
-            .into_iter()
-            .map(|path| fsevent::Event {
-                event_id: 0,
-                flags: fsevent::StreamFlags::empty(),
-                path: path.into(),
-            })
-            .collect::<Vec<_>>();
-
-        self.event_txs.retain(|tx| {
-            let _ = tx.try_send(events.clone());
-            !tx.is_closed()
-        });
-    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -269,6 +248,10 @@ pub struct FakeFs {
     // Use an unfair lock to ensure tests are deterministic.
     state: futures::lock::Mutex<FakeFsState>,
     executor: std::sync::Weak<gpui::executor::Background>,
+    events: (
+        async_broadcast::Sender<Vec<fsevent::Event>>,
+        async_broadcast::Receiver<Vec<fsevent::Event>>,
+    ),
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -292,8 +275,8 @@ impl FakeFs {
             state: futures::lock::Mutex::new(FakeFsState {
                 entries,
                 next_inode: 1,
-                event_txs: Default::default(),
             }),
+            events: async_broadcast::broadcast(16),
         })
     }
 
@@ -316,7 +299,9 @@ impl FakeFs {
                 content: None,
             },
         );
-        state.emit_event(&[path]).await;
+
+        drop(state);
+        self.emit_event(&[path]).await;
     }
 
     pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) {
@@ -338,7 +323,9 @@ impl FakeFs {
                 content: Some(content),
             },
         );
-        state.emit_event(&[path]).await;
+
+        drop(state);
+        self.emit_event(&[path]).await;
     }
 
     #[must_use]
@@ -383,6 +370,23 @@ impl FakeFs {
             .simulate_random_delay()
             .await;
     }
+
+    async fn emit_event<I, T>(&self, paths: I)
+    where
+        I: IntoIterator<Item = T>,
+        T: Into<PathBuf>,
+    {
+        let events = paths
+            .into_iter()
+            .map(|path| fsevent::Event {
+                event_id: 0,
+                flags: fsevent::StreamFlags::empty(),
+                path: path.into(),
+            })
+            .collect::<Vec<_>>();
+
+        let _ = self.events.0.broadcast(events).await;
+    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -420,7 +424,8 @@ impl Fs for FakeFs {
                 ));
             }
         }
-        state.emit_event(&created_dir_paths).await;
+        drop(state);
+        self.emit_event(&created_dir_paths).await;
 
         Ok(())
     }
@@ -461,7 +466,8 @@ impl Fs for FakeFs {
             };
             state.entries.insert(path.to_path_buf(), entry);
         }
-        state.emit_event(&[path]).await;
+        drop(state);
+        self.emit_event(&[path]).await;
 
         Ok(())
     }
@@ -497,7 +503,8 @@ impl Fs for FakeFs {
             state.entries.insert(new_path, entry);
         }
 
-        state.emit_event(&[source, target]).await;
+        drop(state);
+        self.emit_event(&[source, target]).await;
         Ok(())
     }
 
@@ -522,7 +529,8 @@ impl Fs for FakeFs {
             }
 
             state.entries.retain(|path, _| !path.starts_with(path));
-            state.emit_event(&[path]).await;
+            drop(state);
+            self.emit_event(&[path]).await;
         } else if !options.ignore_if_not_exists {
             return Err(anyhow!("{path:?} does not exist"));
         }
@@ -540,7 +548,8 @@ impl Fs for FakeFs {
             }
 
             state.entries.remove(&path);
-            state.emit_event(&[path]).await;
+            drop(state);
+            self.emit_event(&[path]).await;
         } else if !options.ignore_if_not_exists {
             return Err(anyhow!("{path:?} does not exist"));
         }
@@ -575,7 +584,8 @@ impl Fs for FakeFs {
             } else {
                 entry.content = Some(text.chunks().collect());
                 entry.metadata.mtime = SystemTime::now();
-                state.emit_event(&[path]).await;
+                drop(state);
+                self.emit_event(&[path]).await;
                 Ok(())
             }
         } else {
@@ -591,7 +601,8 @@ impl Fs for FakeFs {
                 content: Some(text.chunks().collect()),
             };
             state.entries.insert(path.to_path_buf(), entry);
-            state.emit_event(&[path]).await;
+            drop(state);
+            self.emit_event(&[path]).await;
             Ok(())
         }
     }
@@ -642,10 +653,8 @@ impl Fs for FakeFs {
         path: &Path,
         _: Duration,
     ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
-        let mut state = self.state.lock().await;
         self.simulate_random_delay().await;
-        let (tx, rx) = smol::channel::unbounded();
-        state.event_txs.push(tx);
+        let rx = self.events.1.clone();
         let path = path.to_path_buf();
         Box::pin(futures::StreamExt::filter(rx, move |events| {
             let result = events.iter().any(|event| event.path.starts_with(&path));