fs.rs

  1use super::{char_bag::CharBag, char_bag_for_path, Entry, EntryKind, Rope};
  2use anyhow::{anyhow, Context, Result};
  3use atomic::Ordering::SeqCst;
  4use fsevent::EventStream;
  5use futures::{future::BoxFuture, Stream, StreamExt};
  6use postage::prelude::Sink as _;
  7use smol::io::{AsyncReadExt, AsyncWriteExt};
  8use std::{
  9    io,
 10    os::unix::fs::MetadataExt,
 11    path::{Path, PathBuf},
 12    pin::Pin,
 13    sync::{
 14        atomic::{self, AtomicUsize},
 15        Arc,
 16    },
 17    time::{Duration, SystemTime},
 18};
 19
 20#[async_trait::async_trait]
 21pub trait Fs: Send + Sync {
 22    async fn entry(
 23        &self,
 24        root_char_bag: CharBag,
 25        next_entry_id: &AtomicUsize,
 26        path: Arc<Path>,
 27        abs_path: &Path,
 28    ) -> Result<Option<Entry>>;
 29    async fn child_entries<'a>(
 30        &self,
 31        root_char_bag: CharBag,
 32        next_entry_id: &'a AtomicUsize,
 33        path: &'a Path,
 34        abs_path: &'a Path,
 35    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
 36    async fn load(&self, path: &Path) -> Result<String>;
 37    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 38    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 39    async fn is_file(&self, path: &Path) -> bool;
 40    async fn watch(
 41        &self,
 42        path: &Path,
 43        latency: Duration,
 44    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 45    fn is_fake(&self) -> bool;
 46}
 47
 48pub struct RealFs;
 49
 50#[async_trait::async_trait]
 51impl Fs for RealFs {
 52    async fn entry(
 53        &self,
 54        root_char_bag: CharBag,
 55        next_entry_id: &AtomicUsize,
 56        path: Arc<Path>,
 57        abs_path: &Path,
 58    ) -> Result<Option<Entry>> {
 59        let metadata = match smol::fs::metadata(&abs_path).await {
 60            Err(err) => {
 61                return match (err.kind(), err.raw_os_error()) {
 62                    (io::ErrorKind::NotFound, _) => Ok(None),
 63                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 64                    _ => Err(anyhow::Error::new(err)),
 65                }
 66            }
 67            Ok(metadata) => metadata,
 68        };
 69        let inode = metadata.ino();
 70        let mtime = metadata.modified()?;
 71        let is_symlink = smol::fs::symlink_metadata(&abs_path)
 72            .await
 73            .context("failed to read symlink metadata")?
 74            .file_type()
 75            .is_symlink();
 76
 77        let entry = Entry {
 78            id: next_entry_id.fetch_add(1, SeqCst),
 79            kind: if metadata.file_type().is_dir() {
 80                EntryKind::PendingDir
 81            } else {
 82                EntryKind::File(char_bag_for_path(root_char_bag, &path))
 83            },
 84            path: Arc::from(path),
 85            inode,
 86            mtime,
 87            is_symlink,
 88            is_ignored: false,
 89        };
 90
 91        Ok(Some(entry))
 92    }
 93
 94    async fn child_entries<'a>(
 95        &self,
 96        root_char_bag: CharBag,
 97        next_entry_id: &'a AtomicUsize,
 98        path: &'a Path,
 99        abs_path: &'a Path,
100    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
101        let entries = smol::fs::read_dir(abs_path).await?;
102        Ok(entries
103            .then(move |entry| async move {
104                let child_entry = entry?;
105                let child_name = child_entry.file_name();
106                let child_path: Arc<Path> = path.join(&child_name).into();
107                let child_abs_path = abs_path.join(&child_name);
108                let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
109                let child_metadata = smol::fs::metadata(child_abs_path).await?;
110                let child_inode = child_metadata.ino();
111                let child_mtime = child_metadata.modified()?;
112                Ok(Entry {
113                    id: next_entry_id.fetch_add(1, SeqCst),
114                    kind: if child_metadata.file_type().is_dir() {
115                        EntryKind::PendingDir
116                    } else {
117                        EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
118                    },
119                    path: child_path,
120                    inode: child_inode,
121                    mtime: child_mtime,
122                    is_symlink: child_is_symlink,
123                    is_ignored: false,
124                })
125            })
126            .boxed())
127    }
128
129    async fn load(&self, path: &Path) -> Result<String> {
130        let mut file = smol::fs::File::open(path).await?;
131        let mut text = String::new();
132        file.read_to_string(&mut text).await?;
133        Ok(text)
134    }
135
136    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
137        let buffer_size = text.summary().bytes.min(10 * 1024);
138        let file = smol::fs::File::create(path).await?;
139        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
140        for chunk in text.chunks() {
141            writer.write_all(chunk.as_bytes()).await?;
142        }
143        writer.flush().await?;
144        Ok(())
145    }
146
147    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
148        Ok(smol::fs::canonicalize(path).await?)
149    }
150
151    async fn is_file(&self, path: &Path) -> bool {
152        smol::fs::metadata(path)
153            .await
154            .map_or(false, |metadata| metadata.is_file())
155    }
156
157    async fn watch(
158        &self,
159        path: &Path,
160        latency: Duration,
161    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
162        let (mut tx, rx) = postage::mpsc::channel(64);
163        let (stream, handle) = EventStream::new(&[path], latency);
164        std::mem::forget(handle);
165        std::thread::spawn(move || {
166            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
167        });
168        Box::pin(rx)
169    }
170
171    fn is_fake(&self) -> bool {
172        false
173    }
174}
175
176#[derive(Clone, Debug)]
177struct FakeFsEntry {
178    inode: u64,
179    mtime: SystemTime,
180    is_dir: bool,
181    is_symlink: bool,
182    content: Option<String>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186struct FakeFsState {
187    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
188    next_inode: u64,
189    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
190}
191
192#[cfg(any(test, feature = "test-support"))]
193impl FakeFsState {
194    fn validate_path(&self, path: &Path) -> Result<()> {
195        if path.is_absolute()
196            && path
197                .parent()
198                .and_then(|path| self.entries.get(path))
199                .map_or(false, |e| e.is_dir)
200        {
201            Ok(())
202        } else {
203            Err(anyhow!("invalid path {:?}", path))
204        }
205    }
206
207    async fn emit_event(&mut self, paths: &[&Path]) {
208        let events = paths
209            .iter()
210            .map(|path| fsevent::Event {
211                event_id: 0,
212                flags: fsevent::StreamFlags::empty(),
213                path: path.to_path_buf(),
214            })
215            .collect();
216
217        let _ = self.events_tx.send(events).await;
218    }
219}
220
221#[cfg(any(test, feature = "test-support"))]
222pub struct FakeFs {
223    // Use an unfair lock to ensure tests are deterministic.
224    state: futures::lock::Mutex<FakeFsState>,
225}
226
227#[cfg(any(test, feature = "test-support"))]
228impl FakeFs {
229    pub fn new() -> Self {
230        let (events_tx, _) = postage::broadcast::channel(2048);
231        let mut entries = std::collections::BTreeMap::new();
232        entries.insert(
233            Path::new("/").to_path_buf(),
234            FakeFsEntry {
235                inode: 0,
236                mtime: SystemTime::now(),
237                is_dir: true,
238                is_symlink: false,
239                content: None,
240            },
241        );
242        Self {
243            state: futures::lock::Mutex::new(FakeFsState {
244                entries,
245                next_inode: 1,
246                events_tx,
247            }),
248        }
249    }
250
251    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
252        let mut state = self.state.lock().await;
253        let path = path.as_ref();
254        state.validate_path(path)?;
255
256        let inode = state.next_inode;
257        state.next_inode += 1;
258        state.entries.insert(
259            path.to_path_buf(),
260            FakeFsEntry {
261                inode,
262                mtime: SystemTime::now(),
263                is_dir: true,
264                is_symlink: false,
265                content: None,
266            },
267        );
268        state.emit_event(&[path]).await;
269        Ok(())
270    }
271
272    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
273        let mut state = self.state.lock().await;
274        let path = path.as_ref();
275        state.validate_path(path)?;
276
277        let inode = state.next_inode;
278        state.next_inode += 1;
279        state.entries.insert(
280            path.to_path_buf(),
281            FakeFsEntry {
282                inode,
283                mtime: SystemTime::now(),
284                is_dir: false,
285                is_symlink: false,
286                content: Some(content),
287            },
288        );
289        state.emit_event(&[path]).await;
290        Ok(())
291    }
292
293    pub fn insert_tree<'a>(
294        &'a self,
295        path: impl 'a + AsRef<Path> + Send,
296        tree: serde_json::Value,
297    ) -> BoxFuture<'a, ()> {
298        use futures::FutureExt as _;
299        use serde_json::Value::*;
300
301        async move {
302            let path = path.as_ref();
303
304            match tree {
305                Object(map) => {
306                    self.insert_dir(path).await.unwrap();
307                    for (name, contents) in map {
308                        let mut path = PathBuf::from(path);
309                        path.push(name);
310                        self.insert_tree(&path, contents).await;
311                    }
312                }
313                Null => {
314                    self.insert_dir(&path).await.unwrap();
315                }
316                String(contents) => {
317                    self.insert_file(&path, contents).await.unwrap();
318                }
319                _ => {
320                    panic!("JSON object must contain only objects, strings, or null");
321                }
322            }
323        }
324        .boxed()
325    }
326
327    pub async fn remove(&self, path: &Path) -> Result<()> {
328        let mut state = self.state.lock().await;
329        state.validate_path(path)?;
330        state.entries.retain(|path, _| !path.starts_with(path));
331        state.emit_event(&[path]).await;
332        Ok(())
333    }
334
335    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
336        let mut state = self.state.lock().await;
337        state.validate_path(source)?;
338        state.validate_path(target)?;
339        if state.entries.contains_key(target) {
340            Err(anyhow!("target path already exists"))
341        } else {
342            let mut removed = Vec::new();
343            state.entries.retain(|path, entry| {
344                if let Ok(relative_path) = path.strip_prefix(source) {
345                    removed.push((relative_path.to_path_buf(), entry.clone()));
346                    false
347                } else {
348                    true
349                }
350            });
351
352            for (relative_path, entry) in removed {
353                let new_path = target.join(relative_path);
354                state.entries.insert(new_path, entry);
355            }
356
357            state.emit_event(&[source, target]).await;
358            Ok(())
359        }
360    }
361}
362
363#[cfg(any(test, feature = "test-support"))]
364#[async_trait::async_trait]
365impl Fs for FakeFs {
366    async fn entry(
367        &self,
368        root_char_bag: CharBag,
369        next_entry_id: &AtomicUsize,
370        path: Arc<Path>,
371        abs_path: &Path,
372    ) -> Result<Option<Entry>> {
373        let state = self.state.lock().await;
374        if let Some(entry) = state.entries.get(abs_path) {
375            Ok(Some(Entry {
376                id: next_entry_id.fetch_add(1, SeqCst),
377                kind: if entry.is_dir {
378                    EntryKind::PendingDir
379                } else {
380                    EntryKind::File(char_bag_for_path(root_char_bag, &path))
381                },
382                path: Arc::from(path),
383                inode: entry.inode,
384                mtime: entry.mtime,
385                is_symlink: entry.is_symlink,
386                is_ignored: false,
387            }))
388        } else {
389            Ok(None)
390        }
391    }
392
393    async fn child_entries<'a>(
394        &self,
395        root_char_bag: CharBag,
396        next_entry_id: &'a AtomicUsize,
397        path: &'a Path,
398        abs_path: &'a Path,
399    ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
400        use futures::{future, stream};
401
402        let state = self.state.lock().await;
403        Ok(stream::iter(state.entries.clone())
404            .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
405            .then(move |(child_abs_path, child_entry)| async move {
406                smol::future::yield_now().await;
407                let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
408                Ok(Entry {
409                    id: next_entry_id.fetch_add(1, SeqCst),
410                    kind: if child_entry.is_dir {
411                        EntryKind::PendingDir
412                    } else {
413                        EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
414                    },
415                    path: child_path,
416                    inode: child_entry.inode,
417                    mtime: child_entry.mtime,
418                    is_symlink: child_entry.is_symlink,
419                    is_ignored: false,
420                })
421            })
422            .boxed())
423    }
424
425    async fn load(&self, path: &Path) -> Result<String> {
426        let state = self.state.lock().await;
427        let text = state
428            .entries
429            .get(path)
430            .and_then(|e| e.content.as_ref())
431            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
432        Ok(text.clone())
433    }
434
435    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
436        let mut state = self.state.lock().await;
437        state.validate_path(path)?;
438        if let Some(entry) = state.entries.get_mut(path) {
439            if entry.is_dir {
440                Err(anyhow!("cannot overwrite a directory with a file"))
441            } else {
442                entry.content = Some(text.chunks().collect());
443                entry.mtime = SystemTime::now();
444                state.emit_event(&[path]).await;
445                Ok(())
446            }
447        } else {
448            let inode = state.next_inode;
449            state.next_inode += 1;
450            let entry = FakeFsEntry {
451                inode,
452                mtime: SystemTime::now(),
453                is_dir: false,
454                is_symlink: false,
455                content: Some(text.chunks().collect()),
456            };
457            state.entries.insert(path.to_path_buf(), entry);
458            state.emit_event(&[path]).await;
459            Ok(())
460        }
461    }
462
463    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
464        Ok(path.to_path_buf())
465    }
466
467    async fn is_file(&self, path: &Path) -> bool {
468        let state = self.state.lock().await;
469        state.entries.get(path).map_or(false, |entry| !entry.is_dir)
470    }
471
472    async fn watch(
473        &self,
474        path: &Path,
475        _: Duration,
476    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
477        let state = self.state.lock().await;
478        let rx = state.events_tx.subscribe();
479        let path = path.to_path_buf();
480        Box::pin(futures::StreamExt::filter(rx, move |events| {
481            let result = events.iter().any(|event| event.path.starts_with(&path));
482            async move { result }
483        }))
484    }
485
486    fn is_fake(&self) -> bool {
487        true
488    }
489}