Rework `Worktree::scan_complete` to use a watch

Antonio Scandurra created

Change summary

zed/src/worktree.rs | 55 ++++++++++++++++------------------------------
1 file changed, 19 insertions(+), 36 deletions(-)

Detailed changes

zed/src/worktree.rs 🔗

@@ -6,20 +6,21 @@ use crate::{
     sum_tree::{self, Edit, SumTree},
 };
 use anyhow::{anyhow, Result};
-use futures_core::future::BoxFuture;
-pub use fuzzy::match_paths;
 use fuzzy::PathEntry;
+pub use fuzzy::{match_paths, PathMatch};
 use gpui::{scoped_pool, AppContext, Entity, ModelContext, ModelHandle, Task};
 use ignore::dir::{Ignore, IgnoreBuilder};
 use parking_lot::Mutex;
-use postage::{oneshot, prelude::Stream, sink::Sink};
+use postage::{
+    prelude::{Sink, Stream},
+    watch,
+};
 use smol::{channel::Sender, Timer};
 use std::{
     ffi::OsStr,
     fmt, fs,
     future::Future,
     io::{self, Read, Write},
-    mem,
     ops::{AddAssign, Deref},
     os::unix::fs::MetadataExt,
     path::{Path, PathBuf},
@@ -27,20 +28,17 @@ use std::{
     time::Duration,
 };
 
-pub use fuzzy::PathMatch;
-
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 enum ScanState {
     Idle,
     Scanning,
-    Err(io::Error),
+    Err(Arc<io::Error>),
 }
 
 pub struct Worktree {
     snapshot: Snapshot,
     scanner: Arc<BackgroundScanner>,
-    scan_listeners: Mutex<Vec<postage::oneshot::Sender<()>>>,
-    scan_state: ScanState,
+    scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
     poll_scheduled: bool,
 }
 
@@ -63,8 +61,7 @@ impl Worktree {
         let tree = Self {
             snapshot,
             scanner,
-            scan_listeners: Default::default(),
-            scan_state: ScanState::Scanning,
+            scan_state: watch::channel_with(ScanState::Scanning),
             poll_scheduled: false,
         };
 
@@ -77,20 +74,18 @@ impl Worktree {
         tree
     }
 
-    pub fn scan_complete(&self) -> BoxFuture<'static, ()> {
-        if self.is_scanning() {
-            let (tx, mut rx) = oneshot::channel::<()>();
-            self.scan_listeners.lock().push(tx);
-            Box::pin(async move {
-                rx.recv().await;
-            })
-        } else {
-            Box::pin(async {})
+    pub fn scan_complete(&self) -> impl Future<Output = ()> {
+        let mut scan_state_rx = self.scan_state.1.clone();
+        async move {
+            let mut next_scan_state = Some(scan_state_rx.borrow().clone());
+            while let Some(ScanState::Scanning) = next_scan_state {
+                next_scan_state = scan_state_rx.recv().await;
+            }
         }
     }
 
     fn observe_scan_state(&mut self, scan_state: ScanState, ctx: &mut ModelContext<Self>) {
-        self.scan_state = scan_state;
+        let _ = self.scan_state.0.blocking_send(scan_state);
         self.poll_entries(ctx);
     }
 
@@ -105,23 +100,11 @@ impl Worktree {
             })
             .detach();
             self.poll_scheduled = true;
-        } else {
-            let mut listeners = Vec::new();
-            mem::swap(self.scan_listeners.lock().as_mut(), &mut listeners);
-            ctx.spawn(
-                async move {
-                    for mut tx in listeners {
-                        tx.send(()).await.ok();
-                    }
-                },
-                |_, _, _| {},
-            )
-            .detach();
         }
     }
 
     fn is_scanning(&self) -> bool {
-        if let ScanState::Scanning = self.scan_state {
+        if let ScanState::Scanning = *self.scan_state.1.borrow() {
             true
         } else {
             false
@@ -554,7 +537,7 @@ impl BackgroundScanner {
         }
 
         if let Err(err) = self.scan_dirs() {
-            if smol::block_on(self.notify.send(ScanState::Err(err))).is_err() {
+            if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
                 return;
             }
         }