fs.rs

  1use anyhow::{anyhow, Result};
  2use fsevent::EventStream;
  3use futures::{Stream, StreamExt};
  4use postage::prelude::Sink as _;
  5use smol::io::{AsyncReadExt, AsyncWriteExt};
  6use std::{
  7    io,
  8    os::unix::fs::MetadataExt,
  9    path::{Path, PathBuf},
 10    pin::Pin,
 11    time::{Duration, SystemTime},
 12};
 13use text::Rope;
 14
 15#[async_trait::async_trait]
 16pub trait Fs: Send + Sync {
 17    async fn load(&self, path: &Path) -> Result<String>;
 18    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 19    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 20    async fn is_file(&self, path: &Path) -> bool;
 21    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
 22    async fn read_dir(
 23        &self,
 24        path: &Path,
 25    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
 26    async fn watch(
 27        &self,
 28        path: &Path,
 29        latency: Duration,
 30    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 31    fn is_fake(&self) -> bool;
 32    #[cfg(any(test, feature = "test-support"))]
 33    fn as_fake(&self) -> &FakeFs;
 34}
 35
 36#[derive(Clone, Debug)]
 37pub struct Metadata {
 38    pub inode: u64,
 39    pub mtime: SystemTime,
 40    pub is_symlink: bool,
 41    pub is_dir: bool,
 42}
 43
 44pub struct RealFs;
 45
 46#[async_trait::async_trait]
 47impl Fs for RealFs {
 48    async fn load(&self, path: &Path) -> Result<String> {
 49        let mut file = smol::fs::File::open(path).await?;
 50        let mut text = String::new();
 51        file.read_to_string(&mut text).await?;
 52        Ok(text)
 53    }
 54
 55    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
 56        let buffer_size = text.summary().bytes.min(10 * 1024);
 57        let file = smol::fs::File::create(path).await?;
 58        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
 59        for chunk in text.chunks() {
 60            writer.write_all(chunk.as_bytes()).await?;
 61        }
 62        writer.flush().await?;
 63        Ok(())
 64    }
 65
 66    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
 67        Ok(smol::fs::canonicalize(path).await?)
 68    }
 69
 70    async fn is_file(&self, path: &Path) -> bool {
 71        smol::fs::metadata(path)
 72            .await
 73            .map_or(false, |metadata| metadata.is_file())
 74    }
 75
 76    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
 77        let symlink_metadata = match smol::fs::symlink_metadata(path).await {
 78            Ok(metadata) => metadata,
 79            Err(err) => {
 80                return match (err.kind(), err.raw_os_error()) {
 81                    (io::ErrorKind::NotFound, _) => Ok(None),
 82                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 83                    _ => Err(anyhow::Error::new(err)),
 84                }
 85            }
 86        };
 87
 88        let is_symlink = symlink_metadata.file_type().is_symlink();
 89        let metadata = if is_symlink {
 90            smol::fs::metadata(path).await?
 91        } else {
 92            symlink_metadata
 93        };
 94        Ok(Some(Metadata {
 95            inode: metadata.ino(),
 96            mtime: metadata.modified().unwrap(),
 97            is_symlink,
 98            is_dir: metadata.file_type().is_dir(),
 99        }))
100    }
101
102    async fn read_dir(
103        &self,
104        path: &Path,
105    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
106        let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
107            Ok(entry) => Ok(entry.path()),
108            Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
109        });
110        Ok(Box::pin(result))
111    }
112
113    async fn watch(
114        &self,
115        path: &Path,
116        latency: Duration,
117    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
118        let (mut tx, rx) = postage::mpsc::channel(64);
119        let (stream, handle) = EventStream::new(&[path], latency);
120        std::mem::forget(handle);
121        std::thread::spawn(move || {
122            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
123        });
124        Box::pin(rx)
125    }
126
127    fn is_fake(&self) -> bool {
128        false
129    }
130
131    #[cfg(any(test, feature = "test-support"))]
132    fn as_fake(&self) -> &FakeFs {
133        panic!("called `RealFs::as_fake`")
134    }
135}
136
137#[cfg(any(test, feature = "test-support"))]
138#[derive(Clone, Debug)]
139struct FakeFsEntry {
140    metadata: Metadata,
141    content: Option<String>,
142}
143
144#[cfg(any(test, feature = "test-support"))]
145struct FakeFsState {
146    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
147    next_inode: u64,
148    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
149}
150
151#[cfg(any(test, feature = "test-support"))]
152impl FakeFsState {
153    fn validate_path(&self, path: &Path) -> Result<()> {
154        if path.is_absolute()
155            && path
156                .parent()
157                .and_then(|path| self.entries.get(path))
158                .map_or(false, |e| e.metadata.is_dir)
159        {
160            Ok(())
161        } else {
162            Err(anyhow!("invalid path {:?}", path))
163        }
164    }
165
166    async fn emit_event(&mut self, paths: &[&Path]) {
167        let events = paths
168            .iter()
169            .map(|path| fsevent::Event {
170                event_id: 0,
171                flags: fsevent::StreamFlags::empty(),
172                path: path.to_path_buf(),
173            })
174            .collect();
175
176        let _ = self.events_tx.send(events).await;
177    }
178}
179
180#[cfg(any(test, feature = "test-support"))]
181pub struct FakeFs {
182    // Use an unfair lock to ensure tests are deterministic.
183    state: futures::lock::Mutex<FakeFsState>,
184}
185
186#[cfg(any(test, feature = "test-support"))]
187impl FakeFs {
188    pub fn new() -> Self {
189        let (events_tx, _) = postage::broadcast::channel(2048);
190        let mut entries = std::collections::BTreeMap::new();
191        entries.insert(
192            Path::new("/").to_path_buf(),
193            FakeFsEntry {
194                metadata: Metadata {
195                    inode: 0,
196                    mtime: SystemTime::now(),
197                    is_dir: true,
198                    is_symlink: false,
199                },
200                content: None,
201            },
202        );
203        Self {
204            state: futures::lock::Mutex::new(FakeFsState {
205                entries,
206                next_inode: 1,
207                events_tx,
208            }),
209        }
210    }
211
212    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
213        let mut state = self.state.lock().await;
214        let path = path.as_ref();
215        state.validate_path(path)?;
216
217        let inode = state.next_inode;
218        state.next_inode += 1;
219        state.entries.insert(
220            path.to_path_buf(),
221            FakeFsEntry {
222                metadata: Metadata {
223                    inode,
224                    mtime: SystemTime::now(),
225                    is_dir: true,
226                    is_symlink: false,
227                },
228                content: None,
229            },
230        );
231        state.emit_event(&[path]).await;
232        Ok(())
233    }
234
235    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
236        let mut state = self.state.lock().await;
237        let path = path.as_ref();
238        state.validate_path(path)?;
239
240        let inode = state.next_inode;
241        state.next_inode += 1;
242        state.entries.insert(
243            path.to_path_buf(),
244            FakeFsEntry {
245                metadata: Metadata {
246                    inode,
247                    mtime: SystemTime::now(),
248                    is_dir: false,
249                    is_symlink: false,
250                },
251                content: Some(content),
252            },
253        );
254        state.emit_event(&[path]).await;
255        Ok(())
256    }
257
258    #[must_use]
259    pub fn insert_tree<'a>(
260        &'a self,
261        path: impl 'a + AsRef<Path> + Send,
262        tree: serde_json::Value,
263    ) -> futures::future::BoxFuture<'a, ()> {
264        use futures::FutureExt as _;
265        use serde_json::Value::*;
266
267        async move {
268            let path = path.as_ref();
269
270            match tree {
271                Object(map) => {
272                    self.insert_dir(path).await.unwrap();
273                    for (name, contents) in map {
274                        let mut path = PathBuf::from(path);
275                        path.push(name);
276                        self.insert_tree(&path, contents).await;
277                    }
278                }
279                Null => {
280                    self.insert_dir(&path).await.unwrap();
281                }
282                String(contents) => {
283                    self.insert_file(&path, contents).await.unwrap();
284                }
285                _ => {
286                    panic!("JSON object must contain only objects, strings, or null");
287                }
288            }
289        }
290        .boxed()
291    }
292
293    pub async fn remove(&self, path: &Path) -> Result<()> {
294        let mut state = self.state.lock().await;
295        state.validate_path(path)?;
296        state.entries.retain(|path, _| !path.starts_with(path));
297        state.emit_event(&[path]).await;
298        Ok(())
299    }
300
301    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
302        let mut state = self.state.lock().await;
303        state.validate_path(source)?;
304        state.validate_path(target)?;
305        if state.entries.contains_key(target) {
306            Err(anyhow!("target path already exists"))
307        } else {
308            let mut removed = Vec::new();
309            state.entries.retain(|path, entry| {
310                if let Ok(relative_path) = path.strip_prefix(source) {
311                    removed.push((relative_path.to_path_buf(), entry.clone()));
312                    false
313                } else {
314                    true
315                }
316            });
317
318            for (relative_path, entry) in removed {
319                let new_path = target.join(relative_path);
320                state.entries.insert(new_path, entry);
321            }
322
323            state.emit_event(&[source, target]).await;
324            Ok(())
325        }
326    }
327}
328
329#[cfg(any(test, feature = "test-support"))]
330#[async_trait::async_trait]
331impl Fs for FakeFs {
332    async fn load(&self, path: &Path) -> Result<String> {
333        let state = self.state.lock().await;
334        let text = state
335            .entries
336            .get(path)
337            .and_then(|e| e.content.as_ref())
338            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
339        Ok(text.clone())
340    }
341
342    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
343        let mut state = self.state.lock().await;
344        state.validate_path(path)?;
345        if let Some(entry) = state.entries.get_mut(path) {
346            if entry.metadata.is_dir {
347                Err(anyhow!("cannot overwrite a directory with a file"))
348            } else {
349                entry.content = Some(text.chunks().collect());
350                entry.metadata.mtime = SystemTime::now();
351                state.emit_event(&[path]).await;
352                Ok(())
353            }
354        } else {
355            let inode = state.next_inode;
356            state.next_inode += 1;
357            let entry = FakeFsEntry {
358                metadata: Metadata {
359                    inode,
360                    mtime: SystemTime::now(),
361                    is_dir: false,
362                    is_symlink: false,
363                },
364                content: Some(text.chunks().collect()),
365            };
366            state.entries.insert(path.to_path_buf(), entry);
367            state.emit_event(&[path]).await;
368            Ok(())
369        }
370    }
371
372    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
373        Ok(path.to_path_buf())
374    }
375
376    async fn is_file(&self, path: &Path) -> bool {
377        let state = self.state.lock().await;
378        state
379            .entries
380            .get(path)
381            .map_or(false, |entry| !entry.metadata.is_dir)
382    }
383
384    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
385        let state = self.state.lock().await;
386        Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
387    }
388
389    async fn read_dir(
390        &self,
391        abs_path: &Path,
392    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
393        use futures::{future, stream};
394        let state = self.state.lock().await;
395        let abs_path = abs_path.to_path_buf();
396        Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
397            move |(child_path, _)| {
398                future::ready(if child_path.parent() == Some(&abs_path) {
399                    Some(Ok(child_path))
400                } else {
401                    None
402                })
403            },
404        )))
405    }
406
407    async fn watch(
408        &self,
409        path: &Path,
410        _: Duration,
411    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
412        let state = self.state.lock().await;
413        let rx = state.events_tx.subscribe();
414        let path = path.to_path_buf();
415        Box::pin(futures::StreamExt::filter(rx, move |events| {
416            let result = events.iter().any(|event| event.path.starts_with(&path));
417            async move { result }
418        }))
419    }
420
421    fn is_fake(&self) -> bool {
422        true
423    }
424
425    #[cfg(any(test, feature = "test-support"))]
426    fn as_fake(&self) -> &FakeFs {
427        self
428    }
429}