fs.rs

  1use anyhow::{anyhow, Result};
  2use buffer::Rope;
  3use fsevent::EventStream;
  4use futures::{Stream, StreamExt};
  5use postage::prelude::Sink as _;
  6use smol::io::{AsyncReadExt, AsyncWriteExt};
  7use std::{
  8    io,
  9    os::unix::fs::MetadataExt,
 10    path::{Path, PathBuf},
 11    pin::Pin,
 12    time::{Duration, SystemTime},
 13};
 14
 15#[async_trait::async_trait]
 16pub trait Fs: Send + Sync {
 17    async fn load(&self, path: &Path) -> Result<String>;
 18    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 19    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 20    async fn is_file(&self, path: &Path) -> bool;
 21    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
 22    async fn read_dir(
 23        &self,
 24        path: &Path,
 25    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
 26    async fn watch(
 27        &self,
 28        path: &Path,
 29        latency: Duration,
 30    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 31    fn is_fake(&self) -> bool;
 32    #[cfg(any(test, feature = "test-support"))]
 33    fn as_fake(&self) -> &FakeFs;
 34}
 35
 36#[derive(Clone, Debug)]
 37pub struct Metadata {
 38    pub inode: u64,
 39    pub mtime: SystemTime,
 40    pub is_symlink: bool,
 41    pub is_dir: bool,
 42}
 43
 44pub struct RealFs;
 45
 46#[async_trait::async_trait]
 47impl Fs for RealFs {
 48    async fn load(&self, path: &Path) -> Result<String> {
 49        let mut file = smol::fs::File::open(path).await?;
 50        let mut text = String::new();
 51        file.read_to_string(&mut text).await?;
 52        Ok(text)
 53    }
 54
 55    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
 56        let buffer_size = text.summary().bytes.min(10 * 1024);
 57        let file = smol::fs::File::create(path).await?;
 58        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
 59        for chunk in text.chunks() {
 60            writer.write_all(chunk.as_bytes()).await?;
 61        }
 62        writer.flush().await?;
 63        Ok(())
 64    }
 65
 66    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
 67        Ok(smol::fs::canonicalize(path).await?)
 68    }
 69
 70    async fn is_file(&self, path: &Path) -> bool {
 71        smol::fs::metadata(path)
 72            .await
 73            .map_or(false, |metadata| metadata.is_file())
 74    }
 75
 76    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
 77        let symlink_metadata = match smol::fs::symlink_metadata(path).await {
 78            Ok(metadata) => metadata,
 79            Err(err) => {
 80                return match (err.kind(), err.raw_os_error()) {
 81                    (io::ErrorKind::NotFound, _) => Ok(None),
 82                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 83                    _ => Err(anyhow::Error::new(err)),
 84                }
 85            }
 86        };
 87
 88        let is_symlink = symlink_metadata.file_type().is_symlink();
 89        let metadata = if is_symlink {
 90            smol::fs::metadata(path).await?
 91        } else {
 92            symlink_metadata
 93        };
 94        Ok(Some(Metadata {
 95            inode: metadata.ino(),
 96            mtime: metadata.modified().unwrap(),
 97            is_symlink,
 98            is_dir: metadata.file_type().is_dir(),
 99        }))
100    }
101
102    async fn read_dir(
103        &self,
104        path: &Path,
105    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
106        let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
107            Ok(entry) => Ok(entry.path()),
108            Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
109        });
110        Ok(Box::pin(result))
111    }
112
113    async fn watch(
114        &self,
115        path: &Path,
116        latency: Duration,
117    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
118        let (mut tx, rx) = postage::mpsc::channel(64);
119        let (stream, handle) = EventStream::new(&[path], latency);
120        std::mem::forget(handle);
121        std::thread::spawn(move || {
122            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
123        });
124        Box::pin(rx)
125    }
126
127    fn is_fake(&self) -> bool {
128        false
129    }
130
131    #[cfg(any(test, feature = "test-support"))]
132    fn as_fake(&self) -> &FakeFs {
133        panic!("called `RealFs::as_fake`")
134    }
135}
136
137#[derive(Clone, Debug)]
138struct FakeFsEntry {
139    metadata: Metadata,
140    content: Option<String>,
141}
142
143#[cfg(any(test, feature = "test-support"))]
144struct FakeFsState {
145    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
146    next_inode: u64,
147    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
148}
149
150#[cfg(any(test, feature = "test-support"))]
151impl FakeFsState {
152    fn validate_path(&self, path: &Path) -> Result<()> {
153        if path.is_absolute()
154            && path
155                .parent()
156                .and_then(|path| self.entries.get(path))
157                .map_or(false, |e| e.metadata.is_dir)
158        {
159            Ok(())
160        } else {
161            Err(anyhow!("invalid path {:?}", path))
162        }
163    }
164
165    async fn emit_event(&mut self, paths: &[&Path]) {
166        let events = paths
167            .iter()
168            .map(|path| fsevent::Event {
169                event_id: 0,
170                flags: fsevent::StreamFlags::empty(),
171                path: path.to_path_buf(),
172            })
173            .collect();
174
175        let _ = self.events_tx.send(events).await;
176    }
177}
178
179#[cfg(any(test, feature = "test-support"))]
180pub struct FakeFs {
181    // Use an unfair lock to ensure tests are deterministic.
182    state: futures::lock::Mutex<FakeFsState>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186impl FakeFs {
187    pub fn new() -> Self {
188        let (events_tx, _) = postage::broadcast::channel(2048);
189        let mut entries = std::collections::BTreeMap::new();
190        entries.insert(
191            Path::new("/").to_path_buf(),
192            FakeFsEntry {
193                metadata: Metadata {
194                    inode: 0,
195                    mtime: SystemTime::now(),
196                    is_dir: true,
197                    is_symlink: false,
198                },
199                content: None,
200            },
201        );
202        Self {
203            state: futures::lock::Mutex::new(FakeFsState {
204                entries,
205                next_inode: 1,
206                events_tx,
207            }),
208        }
209    }
210
211    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
212        let mut state = self.state.lock().await;
213        let path = path.as_ref();
214        state.validate_path(path)?;
215
216        let inode = state.next_inode;
217        state.next_inode += 1;
218        state.entries.insert(
219            path.to_path_buf(),
220            FakeFsEntry {
221                metadata: Metadata {
222                    inode,
223                    mtime: SystemTime::now(),
224                    is_dir: true,
225                    is_symlink: false,
226                },
227                content: None,
228            },
229        );
230        state.emit_event(&[path]).await;
231        Ok(())
232    }
233
234    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
235        let mut state = self.state.lock().await;
236        let path = path.as_ref();
237        state.validate_path(path)?;
238
239        let inode = state.next_inode;
240        state.next_inode += 1;
241        state.entries.insert(
242            path.to_path_buf(),
243            FakeFsEntry {
244                metadata: Metadata {
245                    inode,
246                    mtime: SystemTime::now(),
247                    is_dir: false,
248                    is_symlink: false,
249                },
250                content: Some(content),
251            },
252        );
253        state.emit_event(&[path]).await;
254        Ok(())
255    }
256
257    #[must_use]
258    pub fn insert_tree<'a>(
259        &'a self,
260        path: impl 'a + AsRef<Path> + Send,
261        tree: serde_json::Value,
262    ) -> futures::future::BoxFuture<'a, ()> {
263        use futures::FutureExt as _;
264        use serde_json::Value::*;
265
266        async move {
267            let path = path.as_ref();
268
269            match tree {
270                Object(map) => {
271                    self.insert_dir(path).await.unwrap();
272                    for (name, contents) in map {
273                        let mut path = PathBuf::from(path);
274                        path.push(name);
275                        self.insert_tree(&path, contents).await;
276                    }
277                }
278                Null => {
279                    self.insert_dir(&path).await.unwrap();
280                }
281                String(contents) => {
282                    self.insert_file(&path, contents).await.unwrap();
283                }
284                _ => {
285                    panic!("JSON object must contain only objects, strings, or null");
286                }
287            }
288        }
289        .boxed()
290    }
291
292    pub async fn remove(&self, path: &Path) -> Result<()> {
293        let mut state = self.state.lock().await;
294        state.validate_path(path)?;
295        state.entries.retain(|path, _| !path.starts_with(path));
296        state.emit_event(&[path]).await;
297        Ok(())
298    }
299
300    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
301        let mut state = self.state.lock().await;
302        state.validate_path(source)?;
303        state.validate_path(target)?;
304        if state.entries.contains_key(target) {
305            Err(anyhow!("target path already exists"))
306        } else {
307            let mut removed = Vec::new();
308            state.entries.retain(|path, entry| {
309                if let Ok(relative_path) = path.strip_prefix(source) {
310                    removed.push((relative_path.to_path_buf(), entry.clone()));
311                    false
312                } else {
313                    true
314                }
315            });
316
317            for (relative_path, entry) in removed {
318                let new_path = target.join(relative_path);
319                state.entries.insert(new_path, entry);
320            }
321
322            state.emit_event(&[source, target]).await;
323            Ok(())
324        }
325    }
326}
327
328#[cfg(any(test, feature = "test-support"))]
329#[async_trait::async_trait]
330impl Fs for FakeFs {
331    async fn load(&self, path: &Path) -> Result<String> {
332        let state = self.state.lock().await;
333        let text = state
334            .entries
335            .get(path)
336            .and_then(|e| e.content.as_ref())
337            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
338        Ok(text.clone())
339    }
340
341    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
342        let mut state = self.state.lock().await;
343        state.validate_path(path)?;
344        if let Some(entry) = state.entries.get_mut(path) {
345            if entry.metadata.is_dir {
346                Err(anyhow!("cannot overwrite a directory with a file"))
347            } else {
348                entry.content = Some(text.chunks().collect());
349                entry.metadata.mtime = SystemTime::now();
350                state.emit_event(&[path]).await;
351                Ok(())
352            }
353        } else {
354            let inode = state.next_inode;
355            state.next_inode += 1;
356            let entry = FakeFsEntry {
357                metadata: Metadata {
358                    inode,
359                    mtime: SystemTime::now(),
360                    is_dir: false,
361                    is_symlink: false,
362                },
363                content: Some(text.chunks().collect()),
364            };
365            state.entries.insert(path.to_path_buf(), entry);
366            state.emit_event(&[path]).await;
367            Ok(())
368        }
369    }
370
371    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
372        Ok(path.to_path_buf())
373    }
374
375    async fn is_file(&self, path: &Path) -> bool {
376        let state = self.state.lock().await;
377        state
378            .entries
379            .get(path)
380            .map_or(false, |entry| !entry.metadata.is_dir)
381    }
382
383    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
384        let state = self.state.lock().await;
385        Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
386    }
387
388    async fn read_dir(
389        &self,
390        abs_path: &Path,
391    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
392        use futures::{future, stream};
393        let state = self.state.lock().await;
394        let abs_path = abs_path.to_path_buf();
395        Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
396            move |(child_path, _)| {
397                future::ready(if child_path.parent() == Some(&abs_path) {
398                    Some(Ok(child_path))
399                } else {
400                    None
401                })
402            },
403        )))
404    }
405
406    async fn watch(
407        &self,
408        path: &Path,
409        _: Duration,
410    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
411        let state = self.state.lock().await;
412        let rx = state.events_tx.subscribe();
413        let path = path.to_path_buf();
414        Box::pin(futures::StreamExt::filter(rx, move |events| {
415            let result = events.iter().any(|event| event.path.starts_with(&path));
416            async move { result }
417        }))
418    }
419
420    fn is_fake(&self) -> bool {
421        true
422    }
423
424    #[cfg(any(test, feature = "test-support"))]
425    fn as_fake(&self) -> &FakeFs {
426        self
427    }
428}