fs_watcher.rs

  1use notify::EventKind;
  2use parking_lot::Mutex;
  3use std::sync::{Arc, OnceLock};
  4use util::{ResultExt, paths::SanitizedPath};
  5
  6use crate::{PathEvent, PathEventKind, Watcher};
  7
  8pub struct FsWatcher {
  9    tx: smol::channel::Sender<()>,
 10    pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
 11}
 12
 13impl FsWatcher {
 14    pub fn new(
 15        tx: smol::channel::Sender<()>,
 16        pending_path_events: Arc<Mutex<Vec<PathEvent>>>,
 17    ) -> Self {
 18        Self {
 19            tx,
 20            pending_path_events,
 21        }
 22    }
 23}
 24
 25impl Watcher for FsWatcher {
 26    fn add(&self, path: &std::path::Path) -> anyhow::Result<()> {
 27        let root_path = SanitizedPath::from(path);
 28
 29        let tx = self.tx.clone();
 30        let pending_paths = self.pending_path_events.clone();
 31
 32        use notify::Watcher;
 33
 34        global({
 35            |g| {
 36                g.add(move |event: &notify::Event| {
 37                    let kind = match event.kind {
 38                        EventKind::Create(_) => Some(PathEventKind::Created),
 39                        EventKind::Modify(_) => Some(PathEventKind::Changed),
 40                        EventKind::Remove(_) => Some(PathEventKind::Removed),
 41                        _ => None,
 42                    };
 43                    let mut path_events = event
 44                        .paths
 45                        .iter()
 46                        .filter_map(|event_path| {
 47                            let event_path = SanitizedPath::from(event_path);
 48                            event_path.starts_with(&root_path).then(|| PathEvent {
 49                                path: event_path.as_path().to_path_buf(),
 50                                kind,
 51                            })
 52                        })
 53                        .collect::<Vec<_>>();
 54
 55                    if !path_events.is_empty() {
 56                        path_events.sort();
 57                        let mut pending_paths = pending_paths.lock();
 58                        if pending_paths.is_empty() {
 59                            tx.try_send(()).ok();
 60                        }
 61                        util::extend_sorted(
 62                            &mut *pending_paths,
 63                            path_events,
 64                            usize::MAX,
 65                            |a, b| a.path.cmp(&b.path),
 66                        );
 67                    }
 68                })
 69            }
 70        })?;
 71
 72        global(|g| {
 73            g.watcher
 74                .lock()
 75                .watch(path, notify::RecursiveMode::NonRecursive)
 76        })??;
 77
 78        Ok(())
 79    }
 80
 81    fn remove(&self, path: &std::path::Path) -> anyhow::Result<()> {
 82        use notify::Watcher;
 83        Ok(global(|w| w.watcher.lock().unwatch(path))??)
 84    }
 85}
 86
 87pub struct GlobalWatcher {
 88    // two mutexes because calling watcher.add triggers an watcher.event, which needs watchers.
 89    #[cfg(target_os = "linux")]
 90    pub(super) watcher: Mutex<notify::INotifyWatcher>,
 91    #[cfg(target_os = "freebsd")]
 92    pub(super) watcher: Mutex<notify::KqueueWatcher>,
 93    #[cfg(target_os = "windows")]
 94    pub(super) watcher: Mutex<notify::ReadDirectoryChangesWatcher>,
 95    pub(super) watchers: Mutex<Vec<Box<dyn Fn(&notify::Event) + Send + Sync>>>,
 96}
 97
 98impl GlobalWatcher {
 99    pub(super) fn add(&self, cb: impl Fn(&notify::Event) + Send + Sync + 'static) {
100        self.watchers.lock().push(Box::new(cb))
101    }
102}
103
104static FS_WATCHER_INSTANCE: OnceLock<anyhow::Result<GlobalWatcher, notify::Error>> =
105    OnceLock::new();
106
107fn handle_event(event: Result<notify::Event, notify::Error>) {
108    // Filter out access events, which could lead to a weird bug on Linux after upgrading notify
109    // https://github.com/zed-industries/zed/actions/runs/14085230504/job/39449448832
110    let Some(event) = event
111        .log_err()
112        .filter(|event| !matches!(event.kind, EventKind::Access(_)))
113    else {
114        return;
115    };
116    global::<()>(move |watcher| {
117        for f in watcher.watchers.lock().iter() {
118            f(&event)
119        }
120    })
121    .log_err();
122}
123
124pub fn global<T>(f: impl FnOnce(&GlobalWatcher) -> T) -> anyhow::Result<T> {
125    let result = FS_WATCHER_INSTANCE.get_or_init(|| {
126        notify::recommended_watcher(handle_event).map(|file_watcher| GlobalWatcher {
127            watcher: Mutex::new(file_watcher),
128            watchers: Default::default(),
129        })
130    });
131    match result {
132        Ok(g) => Ok(f(g)),
133        Err(e) => Err(anyhow::anyhow!("{e}")),
134    }
135}