fs.rs

  1use anyhow::{anyhow, Result};
  2use fsevent::EventStream;
  3use futures::{Stream, StreamExt};
  4use smol::io::{AsyncReadExt, AsyncWriteExt};
  5use std::{
  6    io,
  7    os::unix::fs::MetadataExt,
  8    path::{Path, PathBuf},
  9    pin::Pin,
 10    time::{Duration, SystemTime},
 11};
 12use text::Rope;
 13
 14#[async_trait::async_trait]
 15pub trait Fs: Send + Sync {
 16    async fn load(&self, path: &Path) -> Result<String>;
 17    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 18    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 19    async fn is_file(&self, path: &Path) -> bool;
 20    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
 21    async fn read_dir(
 22        &self,
 23        path: &Path,
 24    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
 25    async fn watch(
 26        &self,
 27        path: &Path,
 28        latency: Duration,
 29    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 30    fn is_fake(&self) -> bool;
 31    #[cfg(any(test, feature = "test-support"))]
 32    fn as_fake(&self) -> &FakeFs;
 33}
 34
 35#[derive(Clone, Debug)]
 36pub struct Metadata {
 37    pub inode: u64,
 38    pub mtime: SystemTime,
 39    pub is_symlink: bool,
 40    pub is_dir: bool,
 41}
 42
 43pub struct RealFs;
 44
 45#[async_trait::async_trait]
 46impl Fs for RealFs {
 47    async fn load(&self, path: &Path) -> Result<String> {
 48        let mut file = smol::fs::File::open(path).await?;
 49        let mut text = String::new();
 50        file.read_to_string(&mut text).await?;
 51        Ok(text)
 52    }
 53
 54    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
 55        let buffer_size = text.summary().bytes.min(10 * 1024);
 56        let file = smol::fs::File::create(path).await?;
 57        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
 58        for chunk in text.chunks() {
 59            writer.write_all(chunk.as_bytes()).await?;
 60        }
 61        writer.flush().await?;
 62        Ok(())
 63    }
 64
 65    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
 66        Ok(smol::fs::canonicalize(path).await?)
 67    }
 68
 69    async fn is_file(&self, path: &Path) -> bool {
 70        smol::fs::metadata(path)
 71            .await
 72            .map_or(false, |metadata| metadata.is_file())
 73    }
 74
 75    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
 76        let symlink_metadata = match smol::fs::symlink_metadata(path).await {
 77            Ok(metadata) => metadata,
 78            Err(err) => {
 79                return match (err.kind(), err.raw_os_error()) {
 80                    (io::ErrorKind::NotFound, _) => Ok(None),
 81                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 82                    _ => Err(anyhow::Error::new(err)),
 83                }
 84            }
 85        };
 86
 87        let is_symlink = symlink_metadata.file_type().is_symlink();
 88        let metadata = if is_symlink {
 89            smol::fs::metadata(path).await?
 90        } else {
 91            symlink_metadata
 92        };
 93        Ok(Some(Metadata {
 94            inode: metadata.ino(),
 95            mtime: metadata.modified().unwrap(),
 96            is_symlink,
 97            is_dir: metadata.file_type().is_dir(),
 98        }))
 99    }
100
101    async fn read_dir(
102        &self,
103        path: &Path,
104    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
105        let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
106            Ok(entry) => Ok(entry.path()),
107            Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
108        });
109        Ok(Box::pin(result))
110    }
111
112    async fn watch(
113        &self,
114        path: &Path,
115        latency: Duration,
116    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
117        let (tx, rx) = smol::channel::unbounded();
118        let (stream, handle) = EventStream::new(&[path], latency);
119        std::mem::forget(handle);
120        std::thread::spawn(move || {
121            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
122        });
123        Box::pin(rx)
124    }
125
126    fn is_fake(&self) -> bool {
127        false
128    }
129
130    #[cfg(any(test, feature = "test-support"))]
131    fn as_fake(&self) -> &FakeFs {
132        panic!("called `RealFs::as_fake`")
133    }
134}
135
136#[cfg(any(test, feature = "test-support"))]
137#[derive(Clone, Debug)]
138struct FakeFsEntry {
139    metadata: Metadata,
140    content: Option<String>,
141}
142
143#[cfg(any(test, feature = "test-support"))]
144struct FakeFsState {
145    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
146    next_inode: u64,
147    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
148}
149
150#[cfg(any(test, feature = "test-support"))]
151impl FakeFsState {
152    fn validate_path(&self, path: &Path) -> Result<()> {
153        if path.is_absolute()
154            && path
155                .parent()
156                .and_then(|path| self.entries.get(path))
157                .map_or(false, |e| e.metadata.is_dir)
158        {
159            Ok(())
160        } else {
161            Err(anyhow!("invalid path {:?}", path))
162        }
163    }
164
165    async fn emit_event(&mut self, paths: &[&Path]) {
166        use postage::prelude::Sink as _;
167
168        let events = paths
169            .iter()
170            .map(|path| fsevent::Event {
171                event_id: 0,
172                flags: fsevent::StreamFlags::empty(),
173                path: path.to_path_buf(),
174            })
175            .collect();
176
177        let _ = self.events_tx.send(events).await;
178    }
179}
180
181#[cfg(any(test, feature = "test-support"))]
182pub struct FakeFs {
183    // Use an unfair lock to ensure tests are deterministic.
184    state: futures::lock::Mutex<FakeFsState>,
185    executor: std::sync::Arc<gpui::executor::Background>,
186}
187
188#[cfg(any(test, feature = "test-support"))]
189impl FakeFs {
190    pub fn new(executor: std::sync::Arc<gpui::executor::Background>) -> Self {
191        let (events_tx, _) = postage::broadcast::channel(2048);
192        let mut entries = std::collections::BTreeMap::new();
193        entries.insert(
194            Path::new("/").to_path_buf(),
195            FakeFsEntry {
196                metadata: Metadata {
197                    inode: 0,
198                    mtime: SystemTime::now(),
199                    is_dir: true,
200                    is_symlink: false,
201                },
202                content: None,
203            },
204        );
205        Self {
206            executor,
207            state: futures::lock::Mutex::new(FakeFsState {
208                entries,
209                next_inode: 1,
210                events_tx,
211            }),
212        }
213    }
214
215    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
216        let mut state = self.state.lock().await;
217        let path = path.as_ref();
218        state.validate_path(path)?;
219
220        let inode = state.next_inode;
221        state.next_inode += 1;
222        state.entries.insert(
223            path.to_path_buf(),
224            FakeFsEntry {
225                metadata: Metadata {
226                    inode,
227                    mtime: SystemTime::now(),
228                    is_dir: true,
229                    is_symlink: false,
230                },
231                content: None,
232            },
233        );
234        state.emit_event(&[path]).await;
235        Ok(())
236    }
237
238    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
239        let mut state = self.state.lock().await;
240        let path = path.as_ref();
241        state.validate_path(path)?;
242
243        let inode = state.next_inode;
244        state.next_inode += 1;
245        state.entries.insert(
246            path.to_path_buf(),
247            FakeFsEntry {
248                metadata: Metadata {
249                    inode,
250                    mtime: SystemTime::now(),
251                    is_dir: false,
252                    is_symlink: false,
253                },
254                content: Some(content),
255            },
256        );
257        state.emit_event(&[path]).await;
258        Ok(())
259    }
260
261    #[must_use]
262    pub fn insert_tree<'a>(
263        &'a self,
264        path: impl 'a + AsRef<Path> + Send,
265        tree: serde_json::Value,
266    ) -> futures::future::BoxFuture<'a, ()> {
267        use futures::FutureExt as _;
268        use serde_json::Value::*;
269
270        async move {
271            let path = path.as_ref();
272
273            match tree {
274                Object(map) => {
275                    self.insert_dir(path).await.unwrap();
276                    for (name, contents) in map {
277                        let mut path = PathBuf::from(path);
278                        path.push(name);
279                        self.insert_tree(&path, contents).await;
280                    }
281                }
282                Null => {
283                    self.insert_dir(&path).await.unwrap();
284                }
285                String(contents) => {
286                    self.insert_file(&path, contents).await.unwrap();
287                }
288                _ => {
289                    panic!("JSON object must contain only objects, strings, or null");
290                }
291            }
292        }
293        .boxed()
294    }
295
296    pub async fn remove(&self, path: &Path) -> Result<()> {
297        let mut state = self.state.lock().await;
298        state.validate_path(path)?;
299        state.entries.retain(|path, _| !path.starts_with(path));
300        state.emit_event(&[path]).await;
301        Ok(())
302    }
303
304    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
305        let mut state = self.state.lock().await;
306        state.validate_path(source)?;
307        state.validate_path(target)?;
308        if state.entries.contains_key(target) {
309            Err(anyhow!("target path already exists"))
310        } else {
311            let mut removed = Vec::new();
312            state.entries.retain(|path, entry| {
313                if let Ok(relative_path) = path.strip_prefix(source) {
314                    removed.push((relative_path.to_path_buf(), entry.clone()));
315                    false
316                } else {
317                    true
318                }
319            });
320
321            for (relative_path, entry) in removed {
322                let new_path = target.join(relative_path);
323                state.entries.insert(new_path, entry);
324            }
325
326            state.emit_event(&[source, target]).await;
327            Ok(())
328        }
329    }
330}
331
332#[cfg(any(test, feature = "test-support"))]
333#[async_trait::async_trait]
334impl Fs for FakeFs {
335    async fn load(&self, path: &Path) -> Result<String> {
336        self.executor.simulate_random_delay().await;
337        let state = self.state.lock().await;
338        let text = state
339            .entries
340            .get(path)
341            .and_then(|e| e.content.as_ref())
342            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
343        Ok(text.clone())
344    }
345
346    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
347        self.executor.simulate_random_delay().await;
348        let mut state = self.state.lock().await;
349        state.validate_path(path)?;
350        if let Some(entry) = state.entries.get_mut(path) {
351            if entry.metadata.is_dir {
352                Err(anyhow!("cannot overwrite a directory with a file"))
353            } else {
354                entry.content = Some(text.chunks().collect());
355                entry.metadata.mtime = SystemTime::now();
356                state.emit_event(&[path]).await;
357                Ok(())
358            }
359        } else {
360            let inode = state.next_inode;
361            state.next_inode += 1;
362            let entry = FakeFsEntry {
363                metadata: Metadata {
364                    inode,
365                    mtime: SystemTime::now(),
366                    is_dir: false,
367                    is_symlink: false,
368                },
369                content: Some(text.chunks().collect()),
370            };
371            state.entries.insert(path.to_path_buf(), entry);
372            state.emit_event(&[path]).await;
373            Ok(())
374        }
375    }
376
377    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
378        self.executor.simulate_random_delay().await;
379        Ok(path.to_path_buf())
380    }
381
382    async fn is_file(&self, path: &Path) -> bool {
383        self.executor.simulate_random_delay().await;
384        let state = self.state.lock().await;
385        state
386            .entries
387            .get(path)
388            .map_or(false, |entry| !entry.metadata.is_dir)
389    }
390
391    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
392        self.executor.simulate_random_delay().await;
393        let state = self.state.lock().await;
394        Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
395    }
396
397    async fn read_dir(
398        &self,
399        abs_path: &Path,
400    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
401        use futures::{future, stream};
402        self.executor.simulate_random_delay().await;
403        let state = self.state.lock().await;
404        let abs_path = abs_path.to_path_buf();
405        Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
406            move |(child_path, _)| {
407                future::ready(if child_path.parent() == Some(&abs_path) {
408                    Some(Ok(child_path))
409                } else {
410                    None
411                })
412            },
413        )))
414    }
415
416    async fn watch(
417        &self,
418        path: &Path,
419        _: Duration,
420    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
421        let state = self.state.lock().await;
422        self.executor.simulate_random_delay().await;
423        let rx = state.events_tx.subscribe();
424        let path = path.to_path_buf();
425        Box::pin(futures::StreamExt::filter(rx, move |events| {
426            let result = events.iter().any(|event| event.path.starts_with(&path));
427            async move { result }
428        }))
429    }
430
431    fn is_fake(&self) -> bool {
432        true
433    }
434
435    #[cfg(any(test, feature = "test-support"))]
436    fn as_fake(&self) -> &FakeFs {
437        self
438    }
439}