fs.rs

  1use super::editor::Rope;
  2use anyhow::{anyhow, Result};
  3use fsevent::EventStream;
  4use futures::{Stream, StreamExt};
  5use postage::prelude::Sink as _;
  6use smol::io::{AsyncReadExt, AsyncWriteExt};
  7use std::{
  8    io,
  9    os::unix::fs::MetadataExt,
 10    path::{Path, PathBuf},
 11    pin::Pin,
 12    time::{Duration, SystemTime},
 13};
 14
 15#[async_trait::async_trait]
 16pub trait Fs: Send + Sync {
 17    async fn load(&self, path: &Path) -> Result<String>;
 18    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 19    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 20    async fn is_file(&self, path: &Path) -> bool;
 21    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
 22    async fn read_dir(
 23        &self,
 24        path: &Path,
 25    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
 26    async fn watch(
 27        &self,
 28        path: &Path,
 29        latency: Duration,
 30    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 31    fn is_fake(&self) -> bool;
 32}
 33
 34#[derive(Clone, Debug)]
 35pub struct Metadata {
 36    pub inode: u64,
 37    pub mtime: SystemTime,
 38    pub is_symlink: bool,
 39    pub is_dir: bool,
 40}
 41
 42pub struct RealFs;
 43
 44#[async_trait::async_trait]
 45impl Fs for RealFs {
 46    async fn load(&self, path: &Path) -> Result<String> {
 47        let mut file = smol::fs::File::open(path).await?;
 48        let mut text = String::new();
 49        file.read_to_string(&mut text).await?;
 50        Ok(text)
 51    }
 52
 53    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
 54        let buffer_size = text.summary().bytes.min(10 * 1024);
 55        let file = smol::fs::File::create(path).await?;
 56        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
 57        for chunk in text.chunks() {
 58            writer.write_all(chunk.as_bytes()).await?;
 59        }
 60        writer.flush().await?;
 61        Ok(())
 62    }
 63
 64    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
 65        Ok(smol::fs::canonicalize(path).await?)
 66    }
 67
 68    async fn is_file(&self, path: &Path) -> bool {
 69        smol::fs::metadata(path)
 70            .await
 71            .map_or(false, |metadata| metadata.is_file())
 72    }
 73
 74    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
 75        let symlink_metadata = match smol::fs::symlink_metadata(path).await {
 76            Ok(metadata) => metadata,
 77            Err(err) => {
 78                return match (err.kind(), err.raw_os_error()) {
 79                    (io::ErrorKind::NotFound, _) => Ok(None),
 80                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 81                    _ => Err(anyhow::Error::new(err)),
 82                }
 83            }
 84        };
 85
 86        let is_symlink = symlink_metadata.file_type().is_symlink();
 87        let metadata = if is_symlink {
 88            smol::fs::metadata(path).await?
 89        } else {
 90            symlink_metadata
 91        };
 92        Ok(Some(Metadata {
 93            inode: metadata.ino(),
 94            mtime: metadata.modified().unwrap(),
 95            is_symlink,
 96            is_dir: metadata.file_type().is_dir(),
 97        }))
 98    }
 99
100    async fn read_dir(
101        &self,
102        path: &Path,
103    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
104        let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
105            Ok(entry) => Ok(entry.path()),
106            Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
107        });
108        Ok(Box::pin(result))
109    }
110
111    async fn watch(
112        &self,
113        path: &Path,
114        latency: Duration,
115    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
116        let (mut tx, rx) = postage::mpsc::channel(64);
117        let (stream, handle) = EventStream::new(&[path], latency);
118        std::mem::forget(handle);
119        std::thread::spawn(move || {
120            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
121        });
122        Box::pin(rx)
123    }
124
125    fn is_fake(&self) -> bool {
126        false
127    }
128}
129
130#[derive(Clone, Debug)]
131struct FakeFsEntry {
132    metadata: Metadata,
133    content: Option<String>,
134}
135
136#[cfg(any(test, feature = "test-support"))]
137struct FakeFsState {
138    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
139    next_inode: u64,
140    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
141}
142
143#[cfg(any(test, feature = "test-support"))]
144impl FakeFsState {
145    fn validate_path(&self, path: &Path) -> Result<()> {
146        if path.is_absolute()
147            && path
148                .parent()
149                .and_then(|path| self.entries.get(path))
150                .map_or(false, |e| e.metadata.is_dir)
151        {
152            Ok(())
153        } else {
154            Err(anyhow!("invalid path {:?}", path))
155        }
156    }
157
158    async fn emit_event(&mut self, paths: &[&Path]) {
159        let events = paths
160            .iter()
161            .map(|path| fsevent::Event {
162                event_id: 0,
163                flags: fsevent::StreamFlags::empty(),
164                path: path.to_path_buf(),
165            })
166            .collect();
167
168        let _ = self.events_tx.send(events).await;
169    }
170}
171
172#[cfg(any(test, feature = "test-support"))]
173pub struct FakeFs {
174    // Use an unfair lock to ensure tests are deterministic.
175    state: futures::lock::Mutex<FakeFsState>,
176}
177
178#[cfg(any(test, feature = "test-support"))]
179impl FakeFs {
180    pub fn new() -> Self {
181        let (events_tx, _) = postage::broadcast::channel(2048);
182        let mut entries = std::collections::BTreeMap::new();
183        entries.insert(
184            Path::new("/").to_path_buf(),
185            FakeFsEntry {
186                metadata: Metadata {
187                    inode: 0,
188                    mtime: SystemTime::now(),
189                    is_dir: true,
190                    is_symlink: false,
191                },
192                content: None,
193            },
194        );
195        Self {
196            state: futures::lock::Mutex::new(FakeFsState {
197                entries,
198                next_inode: 1,
199                events_tx,
200            }),
201        }
202    }
203
204    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
205        let mut state = self.state.lock().await;
206        let path = path.as_ref();
207        state.validate_path(path)?;
208
209        let inode = state.next_inode;
210        state.next_inode += 1;
211        state.entries.insert(
212            path.to_path_buf(),
213            FakeFsEntry {
214                metadata: Metadata {
215                    inode,
216                    mtime: SystemTime::now(),
217                    is_dir: true,
218                    is_symlink: false,
219                },
220                content: None,
221            },
222        );
223        state.emit_event(&[path]).await;
224        Ok(())
225    }
226
227    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
228        let mut state = self.state.lock().await;
229        let path = path.as_ref();
230        state.validate_path(path)?;
231
232        let inode = state.next_inode;
233        state.next_inode += 1;
234        state.entries.insert(
235            path.to_path_buf(),
236            FakeFsEntry {
237                metadata: Metadata {
238                    inode,
239                    mtime: SystemTime::now(),
240                    is_dir: false,
241                    is_symlink: false,
242                },
243                content: Some(content),
244            },
245        );
246        state.emit_event(&[path]).await;
247        Ok(())
248    }
249
250    #[must_use]
251    pub fn insert_tree<'a>(
252        &'a self,
253        path: impl 'a + AsRef<Path> + Send,
254        tree: serde_json::Value,
255    ) -> futures::future::BoxFuture<'a, ()> {
256        use futures::FutureExt as _;
257        use serde_json::Value::*;
258
259        async move {
260            let path = path.as_ref();
261
262            match tree {
263                Object(map) => {
264                    self.insert_dir(path).await.unwrap();
265                    for (name, contents) in map {
266                        let mut path = PathBuf::from(path);
267                        path.push(name);
268                        self.insert_tree(&path, contents).await;
269                    }
270                }
271                Null => {
272                    self.insert_dir(&path).await.unwrap();
273                }
274                String(contents) => {
275                    self.insert_file(&path, contents).await.unwrap();
276                }
277                _ => {
278                    panic!("JSON object must contain only objects, strings, or null");
279                }
280            }
281        }
282        .boxed()
283    }
284
285    pub async fn remove(&self, path: &Path) -> Result<()> {
286        let mut state = self.state.lock().await;
287        state.validate_path(path)?;
288        state.entries.retain(|path, _| !path.starts_with(path));
289        state.emit_event(&[path]).await;
290        Ok(())
291    }
292
293    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
294        let mut state = self.state.lock().await;
295        state.validate_path(source)?;
296        state.validate_path(target)?;
297        if state.entries.contains_key(target) {
298            Err(anyhow!("target path already exists"))
299        } else {
300            let mut removed = Vec::new();
301            state.entries.retain(|path, entry| {
302                if let Ok(relative_path) = path.strip_prefix(source) {
303                    removed.push((relative_path.to_path_buf(), entry.clone()));
304                    false
305                } else {
306                    true
307                }
308            });
309
310            for (relative_path, entry) in removed {
311                let new_path = target.join(relative_path);
312                state.entries.insert(new_path, entry);
313            }
314
315            state.emit_event(&[source, target]).await;
316            Ok(())
317        }
318    }
319}
320
321#[cfg(any(test, feature = "test-support"))]
322#[async_trait::async_trait]
323impl Fs for FakeFs {
324    async fn load(&self, path: &Path) -> Result<String> {
325        let state = self.state.lock().await;
326        let text = state
327            .entries
328            .get(path)
329            .and_then(|e| e.content.as_ref())
330            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
331        Ok(text.clone())
332    }
333
334    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
335        let mut state = self.state.lock().await;
336        state.validate_path(path)?;
337        if let Some(entry) = state.entries.get_mut(path) {
338            if entry.metadata.is_dir {
339                Err(anyhow!("cannot overwrite a directory with a file"))
340            } else {
341                entry.content = Some(text.chunks().collect());
342                entry.metadata.mtime = SystemTime::now();
343                state.emit_event(&[path]).await;
344                Ok(())
345            }
346        } else {
347            let inode = state.next_inode;
348            state.next_inode += 1;
349            let entry = FakeFsEntry {
350                metadata: Metadata {
351                    inode,
352                    mtime: SystemTime::now(),
353                    is_dir: false,
354                    is_symlink: false,
355                },
356                content: Some(text.chunks().collect()),
357            };
358            state.entries.insert(path.to_path_buf(), entry);
359            state.emit_event(&[path]).await;
360            Ok(())
361        }
362    }
363
364    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
365        Ok(path.to_path_buf())
366    }
367
368    async fn is_file(&self, path: &Path) -> bool {
369        let state = self.state.lock().await;
370        state
371            .entries
372            .get(path)
373            .map_or(false, |entry| !entry.metadata.is_dir)
374    }
375
376    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
377        let state = self.state.lock().await;
378        Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
379    }
380
381    async fn read_dir(
382        &self,
383        abs_path: &Path,
384    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
385        use futures::{future, stream};
386        let state = self.state.lock().await;
387        let abs_path = abs_path.to_path_buf();
388        Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
389            move |(child_path, _)| {
390                future::ready(if child_path.parent() == Some(&abs_path) {
391                    Some(Ok(child_path))
392                } else {
393                    None
394                })
395            },
396        )))
397    }
398
399    async fn watch(
400        &self,
401        path: &Path,
402        _: Duration,
403    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
404        let state = self.state.lock().await;
405        let rx = state.events_tx.subscribe();
406        let path = path.to_path_buf();
407        Box::pin(futures::StreamExt::filter(rx, move |events| {
408            let result = events.iter().any(|event| event.path.starts_with(&path));
409            async move { result }
410        }))
411    }
412
413    fn is_fake(&self) -> bool {
414        true
415    }
416}