Revert "Use `async_broadcast` to emit fake FS events"

Antonio Scandurra and Nathan Sobo created

This reverts commit 4cfd345f9d91ebd8e76a668f3494ecf2e45c4b9d, because
having a bounded broadcast introduces the possibility of waiting forever
when there isn't yet a receiver processing those events.

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock                |  1 
crates/project/Cargo.toml |  1 
crates/project/src/fs.rs  | 77 ++++++++++++++++++----------------------
3 files changed, 34 insertions(+), 45 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3568,7 +3568,6 @@ version = "0.1.0"
 dependencies = [
  "aho-corasick",
  "anyhow",
- "async-broadcast",
  "async-trait",
  "client",
  "clock",

crates/project/Cargo.toml 🔗

@@ -28,7 +28,6 @@ sum_tree = { path = "../sum_tree" }
 util = { path = "../util" }
 aho-corasick = "0.7"
 anyhow = "1.0.38"
-async-broadcast = "0.3.4"
 async-trait = "0.1"
 futures = "0.3"
 ignore = "0.4"

crates/project/src/fs.rs 🔗

@@ -225,6 +225,7 @@ struct FakeFsEntry {
 struct FakeFsState {
     entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
     next_inode: u64,
+    event_txs: Vec<smol::channel::Sender<Vec<fsevent::Event>>>,
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -241,6 +242,26 @@ impl FakeFsState {
             Err(anyhow!("invalid path {:?}", path))
         }
     }
+
+    async fn emit_event<I, T>(&mut self, paths: I)
+    where
+        I: IntoIterator<Item = T>,
+        T: Into<PathBuf>,
+    {
+        let events = paths
+            .into_iter()
+            .map(|path| fsevent::Event {
+                event_id: 0,
+                flags: fsevent::StreamFlags::empty(),
+                path: path.into(),
+            })
+            .collect::<Vec<_>>();
+
+        self.event_txs.retain(|tx| {
+            let _ = tx.try_send(events.clone());
+            !tx.is_closed()
+        });
+    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -248,10 +269,6 @@ pub struct FakeFs {
     // Use an unfair lock to ensure tests are deterministic.
     state: futures::lock::Mutex<FakeFsState>,
     executor: std::sync::Weak<gpui::executor::Background>,
-    events: (
-        async_broadcast::Sender<Vec<fsevent::Event>>,
-        async_broadcast::Receiver<Vec<fsevent::Event>>,
-    ),
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -275,8 +292,8 @@ impl FakeFs {
             state: futures::lock::Mutex::new(FakeFsState {
                 entries,
                 next_inode: 1,
+                event_txs: Default::default(),
             }),
-            events: async_broadcast::broadcast(16),
         })
     }
 
@@ -299,9 +316,7 @@ impl FakeFs {
                 content: None,
             },
         );
-
-        drop(state);
-        self.emit_event(&[path]).await;
+        state.emit_event(&[path]).await;
     }
 
     pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) {
@@ -323,9 +338,7 @@ impl FakeFs {
                 content: Some(content),
             },
         );
-
-        drop(state);
-        self.emit_event(&[path]).await;
+        state.emit_event(&[path]).await;
     }
 
     #[must_use]
@@ -370,23 +383,6 @@ impl FakeFs {
             .simulate_random_delay()
             .await;
     }
-
-    async fn emit_event<I, T>(&self, paths: I)
-    where
-        I: IntoIterator<Item = T>,
-        T: Into<PathBuf>,
-    {
-        let events = paths
-            .into_iter()
-            .map(|path| fsevent::Event {
-                event_id: 0,
-                flags: fsevent::StreamFlags::empty(),
-                path: path.into(),
-            })
-            .collect::<Vec<_>>();
-
-        let _ = self.events.0.broadcast(events).await;
-    }
 }
 
 #[cfg(any(test, feature = "test-support"))]
@@ -424,8 +420,7 @@ impl Fs for FakeFs {
                 ));
             }
         }
-        drop(state);
-        self.emit_event(&created_dir_paths).await;
+        state.emit_event(&created_dir_paths).await;
 
         Ok(())
     }
@@ -466,8 +461,7 @@ impl Fs for FakeFs {
             };
             state.entries.insert(path.to_path_buf(), entry);
         }
-        drop(state);
-        self.emit_event(&[path]).await;
+        state.emit_event(&[path]).await;
 
         Ok(())
     }
@@ -503,8 +497,7 @@ impl Fs for FakeFs {
             state.entries.insert(new_path, entry);
         }
 
-        drop(state);
-        self.emit_event(&[source, target]).await;
+        state.emit_event(&[source, target]).await;
         Ok(())
     }
 
@@ -529,8 +522,7 @@ impl Fs for FakeFs {
             }
 
             state.entries.retain(|path, _| !path.starts_with(path));
-            drop(state);
-            self.emit_event(&[path]).await;
+            state.emit_event(&[path]).await;
         } else if !options.ignore_if_not_exists {
             return Err(anyhow!("{path:?} does not exist"));
         }
@@ -548,8 +540,7 @@ impl Fs for FakeFs {
             }
 
             state.entries.remove(&path);
-            drop(state);
-            self.emit_event(&[path]).await;
+            state.emit_event(&[path]).await;
         } else if !options.ignore_if_not_exists {
             return Err(anyhow!("{path:?} does not exist"));
         }
@@ -584,8 +575,7 @@ impl Fs for FakeFs {
             } else {
                 entry.content = Some(text.chunks().collect());
                 entry.metadata.mtime = SystemTime::now();
-                drop(state);
-                self.emit_event(&[path]).await;
+                state.emit_event(&[path]).await;
                 Ok(())
             }
         } else {
@@ -601,8 +591,7 @@ impl Fs for FakeFs {
                 content: Some(text.chunks().collect()),
             };
             state.entries.insert(path.to_path_buf(), entry);
-            drop(state);
-            self.emit_event(&[path]).await;
+            state.emit_event(&[path]).await;
             Ok(())
         }
     }
@@ -653,8 +642,10 @@ impl Fs for FakeFs {
         path: &Path,
         _: Duration,
     ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
+        let mut state = self.state.lock().await;
         self.simulate_random_delay().await;
-        let rx = self.events.1.clone();
+        let (tx, rx) = smol::channel::unbounded();
+        state.event_txs.push(tx);
         let path = path.to_path_buf();
         Box::pin(futures::StreamExt::filter(rx, move |events| {
             let result = events.iter().any(|event| event.path.starts_with(&path));