@@ -86,7 +86,7 @@ pub trait Fs: Send + Sync {
async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
async fn is_file(&self, path: &Path) -> bool;
- fn watch(
+ async fn watch(
&self,
path: &Path,
latency: Duration,
@@ -203,7 +203,7 @@ impl Fs for RealFs {
.map_or(false, |metadata| metadata.is_file())
}
- fn watch(
+ async fn watch(
&self,
path: &Path,
latency: Duration,
@@ -269,7 +269,8 @@ impl FakeFsState {
#[cfg(any(test, feature = "test-support"))]
pub struct FakeFs {
- state: smol::lock::RwLock<FakeFsState>,
+ // Use an unfair lock to ensure tests are deterministic.
+ state: futures::lock::Mutex<FakeFsState>,
}
#[cfg(any(test, feature = "test-support"))]
@@ -288,7 +289,7 @@ impl FakeFs {
},
);
Self {
- state: smol::lock::RwLock::new(FakeFsState {
+ state: futures::lock::Mutex::new(FakeFsState {
entries,
next_inode: 1,
events_tx,
@@ -297,7 +298,7 @@ impl FakeFs {
}
pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
- let mut state = self.state.write().await;
+ let mut state = self.state.lock().await;
let path = path.as_ref();
state.validate_path(path)?;
@@ -318,7 +319,7 @@ impl FakeFs {
}
pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
- let mut state = self.state.write().await;
+ let mut state = self.state.lock().await;
let path = path.as_ref();
state.validate_path(path)?;
@@ -339,7 +340,7 @@ impl FakeFs {
}
pub async fn remove(&self, path: &Path) -> Result<()> {
- let mut state = self.state.write().await;
+ let mut state = self.state.lock().await;
state.validate_path(path)?;
state.entries.retain(|path, _| !path.starts_with(path));
state.emit_event(&[path]).await;
@@ -347,7 +348,7 @@ impl FakeFs {
}
pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
- let mut state = self.state.write().await;
+ let mut state = self.state.lock().await;
state.validate_path(source)?;
state.validate_path(target)?;
if state.entries.contains_key(target) {
@@ -384,7 +385,7 @@ impl Fs for FakeFs {
path: Arc<Path>,
abs_path: &Path,
) -> Result<Option<Entry>> {
- let state = self.state.read().await;
+ let state = self.state.lock().await;
if let Some(entry) = state.entries.get(abs_path) {
Ok(Some(Entry {
id: next_entry_id.fetch_add(1, SeqCst),
@@ -413,7 +414,7 @@ impl Fs for FakeFs {
) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
use futures::{future, stream};
- let state = self.state.read().await;
+ let state = self.state.lock().await;
Ok(stream::iter(state.entries.clone())
.filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
.then(move |(child_abs_path, child_entry)| async move {
@@ -437,7 +438,7 @@ impl Fs for FakeFs {
}
async fn load(&self, path: &Path) -> Result<String> {
- let state = self.state.read().await;
+ let state = self.state.lock().await;
let text = state
.entries
.get(path)
@@ -447,7 +448,7 @@ impl Fs for FakeFs {
}
async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
- let mut state = self.state.write().await;
+ let mut state = self.state.lock().await;
state.validate_path(path)?;
if let Some(entry) = state.entries.get_mut(path) {
if entry.is_dir {
@@ -479,16 +480,16 @@ impl Fs for FakeFs {
}
async fn is_file(&self, path: &Path) -> bool {
- let state = self.state.read().await;
+ let state = self.state.lock().await;
state.entries.get(path).map_or(false, |entry| !entry.is_dir)
}
- fn watch(
+ async fn watch(
&self,
path: &Path,
_: Duration,
) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
- let state = smol::block_on(self.state.read());
+ let state = self.state.lock().await;
let rx = state.events_tx.subscribe();
let path = path.to_path_buf();
Box::pin(futures::StreamExt::filter(rx, move |events| {
@@ -547,10 +548,10 @@ impl Worktree {
cx: &mut ModelContext<Worktree>,
) -> Self {
let (mut tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx);
-
- let events = fs.watch(tree.snapshot.abs_path.as_ref(), Duration::from_millis(100));
+ let abs_path = tree.snapshot.abs_path.clone();
let background_snapshot = tree.background_snapshot.clone();
tree._background_scanner_task = Some(cx.background().spawn(async move {
+ let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
let scanner = BackgroundScanner::new(
background_snapshot,
scan_states_tx,
@@ -1059,20 +1060,26 @@ impl LocalWorktree {
cx.spawn_weak(|this, mut cx| async move {
while let Ok(scan_state) = scan_states_rx.recv().await {
if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
- handle.update(&mut cx, |this, cx| {
+ let to_send = handle.update(&mut cx, |this, cx| {
last_scan_state_tx.blocking_send(scan_state).ok();
this.poll_snapshot(cx);
let tree = this.as_local_mut().unwrap();
if !tree.is_scanning() {
if let Some(snapshots_to_send_tx) = tree.snapshots_to_send_tx.clone() {
- if let Err(err) =
- smol::block_on(snapshots_to_send_tx.send(tree.snapshot()))
- {
- log::error!("error submitting snapshot to send {}", err);
- }
+ Some((tree.snapshot(), snapshots_to_send_tx))
+ } else {
+ None
}
+ } else {
+ None
}
});
+
+ if let Some((snapshot, snapshots_to_send_tx)) = to_send {
+ if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
+ log::error!("error submitting snapshot to send {}", err);
+ }
+ }
} else {
break;
}