fs.rs

  1use super::{char_bag::CharBag, char_bag_for_path, Entry, EntryKind, Rope};
  2use anyhow::{anyhow, Context, Result};
  3use atomic::Ordering::SeqCst;
  4use fsevent::EventStream;
  5use futures::{future::BoxFuture, Stream, StreamExt};
  6use postage::prelude::Sink as _;
  7use smol::io::{AsyncReadExt, AsyncWriteExt};
  8use std::{
  9    io,
 10    os::unix::fs::MetadataExt,
 11    path::{Path, PathBuf},
 12    pin::Pin,
 13    sync::{
 14        atomic::{self, AtomicUsize},
 15        Arc,
 16    },
 17    time::{Duration, SystemTime},
 18};
 19
 20#[async_trait::async_trait]
 21pub trait Fs: Send + Sync {
 22    async fn entry(
 23        &self,
 24        root_char_bag: CharBag,
 25        next_entry_id: &AtomicUsize,
 26        path: Arc<Path>,
 27        abs_path: &Path,
 28    ) -> Result<Option<Entry>>;
 29    async fn child_entries<'a>(
 30        &self,
 31        root_char_bag: CharBag,
 32        next_entry_id: &'a AtomicUsize,
 33        path: &'a Path,
 34        abs_path: &'a Path,
 35    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
 36    async fn load(&self, path: &Path) -> Result<String>;
 37    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 38    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 39    async fn is_file(&self, path: &Path) -> bool;
 40    async fn watch(
 41        &self,
 42        path: &Path,
 43        latency: Duration,
 44    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 45    fn is_fake(&self) -> bool;
 46}
 47
 48pub struct RealFs;
 49
 50#[async_trait::async_trait]
 51impl Fs for RealFs {
 52    async fn entry(
 53        &self,
 54        root_char_bag: CharBag,
 55        next_entry_id: &AtomicUsize,
 56        path: Arc<Path>,
 57        abs_path: &Path,
 58    ) -> Result<Option<Entry>> {
 59        let metadata = match smol::fs::metadata(&abs_path).await {
 60            Err(err) => {
 61                return match (err.kind(), err.raw_os_error()) {
 62                    (io::ErrorKind::NotFound, _) => Ok(None),
 63                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 64                    _ => Err(anyhow::Error::new(err)),
 65                }
 66            }
 67            Ok(metadata) => metadata,
 68        };
 69        let inode = metadata.ino();
 70        let mtime = metadata.modified()?;
 71        let is_symlink = smol::fs::symlink_metadata(&abs_path)
 72            .await
 73            .context("failed to read symlink metadata")?
 74            .file_type()
 75            .is_symlink();
 76
 77        let entry = Entry {
 78            id: next_entry_id.fetch_add(1, SeqCst),
 79            kind: if metadata.file_type().is_dir() {
 80                EntryKind::PendingDir
 81            } else {
 82                EntryKind::File(char_bag_for_path(root_char_bag, &path))
 83            },
 84            path: Arc::from(path),
 85            inode,
 86            mtime,
 87            is_symlink,
 88            is_ignored: false,
 89        };
 90
 91        Ok(Some(entry))
 92    }
 93
 94    async fn child_entries<'a>(
 95        &self,
 96        root_char_bag: CharBag,
 97        next_entry_id: &'a AtomicUsize,
 98        path: &'a Path,
 99        abs_path: &'a Path,
100    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
101        let entries = smol::fs::read_dir(abs_path).await?;
102        Ok(entries
103            .then(move |entry| async move {
104                let child_entry = entry?;
105                let child_name = child_entry.file_name();
106                let child_path: Arc<Path> = path.join(&child_name).into();
107                let child_abs_path = abs_path.join(&child_name);
108                let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
109                let child_metadata = smol::fs::metadata(child_abs_path).await?;
110                let child_inode = child_metadata.ino();
111                let child_mtime = child_metadata.modified()?;
112                Ok(Entry {
113                    id: next_entry_id.fetch_add(1, SeqCst),
114                    kind: if child_metadata.file_type().is_dir() {
115                        EntryKind::PendingDir
116                    } else {
117                        EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
118                    },
119                    path: child_path,
120                    inode: child_inode,
121                    mtime: child_mtime,
122                    is_symlink: child_is_symlink,
123                    is_ignored: false,
124                })
125            })
126            .boxed())
127    }
128
129    async fn load(&self, path: &Path) -> Result<String> {
130        let mut file = smol::fs::File::open(path).await?;
131        let mut text = String::new();
132        file.read_to_string(&mut text).await?;
133        Ok(text)
134    }
135
136    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
137        let buffer_size = text.summary().bytes.min(10 * 1024);
138        let file = smol::fs::File::create(path).await?;
139        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
140        for chunk in text.chunks() {
141            writer.write_all(chunk.as_bytes()).await?;
142        }
143        writer.flush().await?;
144        Ok(())
145    }
146
147    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
148        Ok(smol::fs::canonicalize(path).await?)
149    }
150
151    async fn is_file(&self, path: &Path) -> bool {
152        smol::fs::metadata(path)
153            .await
154            .map_or(false, |metadata| metadata.is_file())
155    }
156
157    async fn watch(
158        &self,
159        path: &Path,
160        latency: Duration,
161    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
162        let (mut tx, rx) = postage::mpsc::channel(64);
163        let (stream, handle) = EventStream::new(&[path], latency);
164        std::mem::forget(handle);
165        std::thread::spawn(move || {
166            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
167        });
168        Box::pin(rx)
169    }
170
171    fn is_fake(&self) -> bool {
172        false
173    }
174}
175
176#[derive(Clone, Debug)]
177struct FakeFsEntry {
178    inode: u64,
179    mtime: SystemTime,
180    is_dir: bool,
181    is_symlink: bool,
182    content: Option<String>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186struct FakeFsState {
187    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
188    next_inode: u64,
189    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
190}
191
192#[cfg(any(test, feature = "test-support"))]
193impl FakeFsState {
194    fn validate_path(&self, path: &Path) -> Result<()> {
195        if path.is_absolute()
196            && path
197                .parent()
198                .and_then(|path| self.entries.get(path))
199                .map_or(false, |e| e.is_dir)
200        {
201            Ok(())
202        } else {
203            Err(anyhow!("invalid path {:?}", path))
204        }
205    }
206
207    async fn emit_event(&mut self, paths: &[&Path]) {
208        let events = paths
209            .iter()
210            .map(|path| fsevent::Event {
211                event_id: 0,
212                flags: fsevent::StreamFlags::empty(),
213                path: path.to_path_buf(),
214            })
215            .collect();
216
217        let _ = self.events_tx.send(events).await;
218    }
219}
220
221#[cfg(any(test, feature = "test-support"))]
222pub struct FakeFs {
223    // Use an unfair lock to ensure tests are deterministic.
224    state: futures::lock::Mutex<FakeFsState>,
225}
226
227#[cfg(any(test, feature = "test-support"))]
228impl FakeFs {
229    pub fn new() -> Self {
230        let (events_tx, _) = postage::broadcast::channel(2048);
231        let mut entries = std::collections::BTreeMap::new();
232        entries.insert(
233            Path::new("/").to_path_buf(),
234            FakeFsEntry {
235                inode: 0,
236                mtime: SystemTime::now(),
237                is_dir: true,
238                is_symlink: false,
239                content: None,
240            },
241        );
242        Self {
243            state: futures::lock::Mutex::new(FakeFsState {
244                entries,
245                next_inode: 1,
246                events_tx,
247            }),
248        }
249    }
250
251    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
252        let mut state = self.state.lock().await;
253        let path = path.as_ref();
254        state.validate_path(path)?;
255
256        let inode = state.next_inode;
257        state.next_inode += 1;
258        state.entries.insert(
259            path.to_path_buf(),
260            FakeFsEntry {
261                inode,
262                mtime: SystemTime::now(),
263                is_dir: true,
264                is_symlink: false,
265                content: None,
266            },
267        );
268        state.emit_event(&[path]).await;
269        Ok(())
270    }
271
272    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
273        let mut state = self.state.lock().await;
274        let path = path.as_ref();
275        state.validate_path(path)?;
276
277        let inode = state.next_inode;
278        state.next_inode += 1;
279        state.entries.insert(
280            path.to_path_buf(),
281            FakeFsEntry {
282                inode,
283                mtime: SystemTime::now(),
284                is_dir: false,
285                is_symlink: false,
286                content: Some(content),
287            },
288        );
289        state.emit_event(&[path]).await;
290        Ok(())
291    }
292
293    #[must_use]
294    pub fn insert_tree<'a>(
295        &'a self,
296        path: impl 'a + AsRef<Path> + Send,
297        tree: serde_json::Value,
298    ) -> BoxFuture<'a, ()> {
299        use futures::FutureExt as _;
300        use serde_json::Value::*;
301
302        async move {
303            let path = path.as_ref();
304
305            match tree {
306                Object(map) => {
307                    self.insert_dir(path).await.unwrap();
308                    for (name, contents) in map {
309                        let mut path = PathBuf::from(path);
310                        path.push(name);
311                        self.insert_tree(&path, contents).await;
312                    }
313                }
314                Null => {
315                    self.insert_dir(&path).await.unwrap();
316                }
317                String(contents) => {
318                    self.insert_file(&path, contents).await.unwrap();
319                }
320                _ => {
321                    panic!("JSON object must contain only objects, strings, or null");
322                }
323            }
324        }
325        .boxed()
326    }
327
328    pub async fn remove(&self, path: &Path) -> Result<()> {
329        let mut state = self.state.lock().await;
330        state.validate_path(path)?;
331        state.entries.retain(|path, _| !path.starts_with(path));
332        state.emit_event(&[path]).await;
333        Ok(())
334    }
335
336    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
337        let mut state = self.state.lock().await;
338        state.validate_path(source)?;
339        state.validate_path(target)?;
340        if state.entries.contains_key(target) {
341            Err(anyhow!("target path already exists"))
342        } else {
343            let mut removed = Vec::new();
344            state.entries.retain(|path, entry| {
345                if let Ok(relative_path) = path.strip_prefix(source) {
346                    removed.push((relative_path.to_path_buf(), entry.clone()));
347                    false
348                } else {
349                    true
350                }
351            });
352
353            for (relative_path, entry) in removed {
354                let new_path = target.join(relative_path);
355                state.entries.insert(new_path, entry);
356            }
357
358            state.emit_event(&[source, target]).await;
359            Ok(())
360        }
361    }
362}
363
364#[cfg(any(test, feature = "test-support"))]
365#[async_trait::async_trait]
366impl Fs for FakeFs {
367    async fn entry(
368        &self,
369        root_char_bag: CharBag,
370        next_entry_id: &AtomicUsize,
371        path: Arc<Path>,
372        abs_path: &Path,
373    ) -> Result<Option<Entry>> {
374        let state = self.state.lock().await;
375        if let Some(entry) = state.entries.get(abs_path) {
376            Ok(Some(Entry {
377                id: next_entry_id.fetch_add(1, SeqCst),
378                kind: if entry.is_dir {
379                    EntryKind::PendingDir
380                } else {
381                    EntryKind::File(char_bag_for_path(root_char_bag, &path))
382                },
383                path: Arc::from(path),
384                inode: entry.inode,
385                mtime: entry.mtime,
386                is_symlink: entry.is_symlink,
387                is_ignored: false,
388            }))
389        } else {
390            Ok(None)
391        }
392    }
393
394    async fn child_entries<'a>(
395        &self,
396        root_char_bag: CharBag,
397        next_entry_id: &'a AtomicUsize,
398        path: &'a Path,
399        abs_path: &'a Path,
400    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
401        use futures::{future, stream};
402
403        let state = self.state.lock().await;
404        Ok(stream::iter(state.entries.clone())
405            .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
406            .then(move |(child_abs_path, child_entry)| async move {
407                smol::future::yield_now().await;
408                let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
409                Ok(Entry {
410                    id: next_entry_id.fetch_add(1, SeqCst),
411                    kind: if child_entry.is_dir {
412                        EntryKind::PendingDir
413                    } else {
414                        EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
415                    },
416                    path: child_path,
417                    inode: child_entry.inode,
418                    mtime: child_entry.mtime,
419                    is_symlink: child_entry.is_symlink,
420                    is_ignored: false,
421                })
422            })
423            .boxed())
424    }
425
426    async fn load(&self, path: &Path) -> Result<String> {
427        let state = self.state.lock().await;
428        let text = state
429            .entries
430            .get(path)
431            .and_then(|e| e.content.as_ref())
432            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
433        Ok(text.clone())
434    }
435
436    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
437        let mut state = self.state.lock().await;
438        state.validate_path(path)?;
439        if let Some(entry) = state.entries.get_mut(path) {
440            if entry.is_dir {
441                Err(anyhow!("cannot overwrite a directory with a file"))
442            } else {
443                entry.content = Some(text.chunks().collect());
444                entry.mtime = SystemTime::now();
445                state.emit_event(&[path]).await;
446                Ok(())
447            }
448        } else {
449            let inode = state.next_inode;
450            state.next_inode += 1;
451            let entry = FakeFsEntry {
452                inode,
453                mtime: SystemTime::now(),
454                is_dir: false,
455                is_symlink: false,
456                content: Some(text.chunks().collect()),
457            };
458            state.entries.insert(path.to_path_buf(), entry);
459            state.emit_event(&[path]).await;
460            Ok(())
461        }
462    }
463
464    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
465        Ok(path.to_path_buf())
466    }
467
468    async fn is_file(&self, path: &Path) -> bool {
469        let state = self.state.lock().await;
470        state.entries.get(path).map_or(false, |entry| !entry.is_dir)
471    }
472
473    async fn watch(
474        &self,
475        path: &Path,
476        _: Duration,
477    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
478        let state = self.state.lock().await;
479        let rx = state.events_tx.subscribe();
480        let path = path.to_path_buf();
481        Box::pin(futures::StreamExt::filter(rx, move |events| {
482            let result = events.iter().any(|event| event.path.starts_with(&path));
483            async move { result }
484        }))
485    }
486
487    fn is_fake(&self) -> bool {
488        true
489    }
490}