fs.rs

  1use anyhow::{anyhow, Result};
  2use fsevent::EventStream;
  3use futures::{Stream, StreamExt};
  4use postage::prelude::Sink as _;
  5use smol::io::{AsyncReadExt, AsyncWriteExt};
  6use std::{
  7    io,
  8    os::unix::fs::MetadataExt,
  9    path::{Path, PathBuf},
 10    pin::Pin,
 11    time::{Duration, SystemTime},
 12};
 13use text::Rope;
 14
 15#[async_trait::async_trait]
 16pub trait Fs: Send + Sync {
 17    async fn load(&self, path: &Path) -> Result<String>;
 18    async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
 19    async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
 20    async fn is_file(&self, path: &Path) -> bool;
 21    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
 22    async fn read_dir(
 23        &self,
 24        path: &Path,
 25    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
 26    async fn watch(
 27        &self,
 28        path: &Path,
 29        latency: Duration,
 30    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
 31    fn is_fake(&self) -> bool;
 32    #[cfg(any(test, feature = "test-support"))]
 33    fn as_fake(&self) -> &FakeFs;
 34}
 35
 36#[derive(Clone, Debug)]
 37pub struct Metadata {
 38    pub inode: u64,
 39    pub mtime: SystemTime,
 40    pub is_symlink: bool,
 41    pub is_dir: bool,
 42}
 43
 44pub struct RealFs;
 45
 46#[async_trait::async_trait]
 47impl Fs for RealFs {
 48    async fn load(&self, path: &Path) -> Result<String> {
 49        let mut file = smol::fs::File::open(path).await?;
 50        let mut text = String::new();
 51        file.read_to_string(&mut text).await?;
 52        Ok(text)
 53    }
 54
 55    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
 56        let buffer_size = text.summary().bytes.min(10 * 1024);
 57        let file = smol::fs::File::create(path).await?;
 58        let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
 59        for chunk in text.chunks() {
 60            writer.write_all(chunk.as_bytes()).await?;
 61        }
 62        writer.flush().await?;
 63        Ok(())
 64    }
 65
 66    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
 67        Ok(smol::fs::canonicalize(path).await?)
 68    }
 69
 70    async fn is_file(&self, path: &Path) -> bool {
 71        smol::fs::metadata(path)
 72            .await
 73            .map_or(false, |metadata| metadata.is_file())
 74    }
 75
 76    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
 77        let symlink_metadata = match smol::fs::symlink_metadata(path).await {
 78            Ok(metadata) => metadata,
 79            Err(err) => {
 80                return match (err.kind(), err.raw_os_error()) {
 81                    (io::ErrorKind::NotFound, _) => Ok(None),
 82                    (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
 83                    _ => Err(anyhow::Error::new(err)),
 84                }
 85            }
 86        };
 87
 88        let is_symlink = symlink_metadata.file_type().is_symlink();
 89        let metadata = if is_symlink {
 90            smol::fs::metadata(path).await?
 91        } else {
 92            symlink_metadata
 93        };
 94        Ok(Some(Metadata {
 95            inode: metadata.ino(),
 96            mtime: metadata.modified().unwrap(),
 97            is_symlink,
 98            is_dir: metadata.file_type().is_dir(),
 99        }))
100    }
101
102    async fn read_dir(
103        &self,
104        path: &Path,
105    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
106        let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
107            Ok(entry) => Ok(entry.path()),
108            Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
109        });
110        Ok(Box::pin(result))
111    }
112
113    async fn watch(
114        &self,
115        path: &Path,
116        latency: Duration,
117    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
118        let (mut tx, rx) = postage::mpsc::channel(64);
119        let (stream, handle) = EventStream::new(&[path], latency);
120        std::mem::forget(handle);
121        std::thread::spawn(move || {
122            stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
123        });
124        Box::pin(rx)
125    }
126
127    fn is_fake(&self) -> bool {
128        false
129    }
130
131    #[cfg(any(test, feature = "test-support"))]
132    fn as_fake(&self) -> &FakeFs {
133        panic!("called `RealFs::as_fake`")
134    }
135}
136
137#[cfg(any(test, feature = "test-support"))]
138#[derive(Clone, Debug)]
139struct FakeFsEntry {
140    metadata: Metadata,
141    content: Option<String>,
142}
143
144#[cfg(any(test, feature = "test-support"))]
145struct FakeFsState {
146    entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
147    next_inode: u64,
148    events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
149}
150
151#[cfg(any(test, feature = "test-support"))]
152impl FakeFsState {
153    fn validate_path(&self, path: &Path) -> Result<()> {
154        if path.is_absolute()
155            && path
156                .parent()
157                .and_then(|path| self.entries.get(path))
158                .map_or(false, |e| e.metadata.is_dir)
159        {
160            Ok(())
161        } else {
162            Err(anyhow!("invalid path {:?}", path))
163        }
164    }
165
166    async fn emit_event(&mut self, paths: &[&Path]) {
167        let events = paths
168            .iter()
169            .map(|path| fsevent::Event {
170                event_id: 0,
171                flags: fsevent::StreamFlags::empty(),
172                path: path.to_path_buf(),
173            })
174            .collect();
175
176        let _ = self.events_tx.send(events).await;
177    }
178}
179
180#[cfg(any(test, feature = "test-support"))]
181pub struct FakeFs {
182    // Use an unfair lock to ensure tests are deterministic.
183    state: futures::lock::Mutex<FakeFsState>,
184    executor: std::sync::Arc<gpui::executor::Background>,
185}
186
187#[cfg(any(test, feature = "test-support"))]
188impl FakeFs {
189    pub fn new(executor: std::sync::Arc<gpui::executor::Background>) -> Self {
190        let (events_tx, _) = postage::broadcast::channel(2048);
191        let mut entries = std::collections::BTreeMap::new();
192        entries.insert(
193            Path::new("/").to_path_buf(),
194            FakeFsEntry {
195                metadata: Metadata {
196                    inode: 0,
197                    mtime: SystemTime::now(),
198                    is_dir: true,
199                    is_symlink: false,
200                },
201                content: None,
202            },
203        );
204        Self {
205            executor,
206            state: futures::lock::Mutex::new(FakeFsState {
207                entries,
208                next_inode: 1,
209                events_tx,
210            }),
211        }
212    }
213
214    pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
215        let mut state = self.state.lock().await;
216        let path = path.as_ref();
217        state.validate_path(path)?;
218
219        let inode = state.next_inode;
220        state.next_inode += 1;
221        state.entries.insert(
222            path.to_path_buf(),
223            FakeFsEntry {
224                metadata: Metadata {
225                    inode,
226                    mtime: SystemTime::now(),
227                    is_dir: true,
228                    is_symlink: false,
229                },
230                content: None,
231            },
232        );
233        state.emit_event(&[path]).await;
234        Ok(())
235    }
236
237    pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
238        let mut state = self.state.lock().await;
239        let path = path.as_ref();
240        state.validate_path(path)?;
241
242        let inode = state.next_inode;
243        state.next_inode += 1;
244        state.entries.insert(
245            path.to_path_buf(),
246            FakeFsEntry {
247                metadata: Metadata {
248                    inode,
249                    mtime: SystemTime::now(),
250                    is_dir: false,
251                    is_symlink: false,
252                },
253                content: Some(content),
254            },
255        );
256        state.emit_event(&[path]).await;
257        Ok(())
258    }
259
260    #[must_use]
261    pub fn insert_tree<'a>(
262        &'a self,
263        path: impl 'a + AsRef<Path> + Send,
264        tree: serde_json::Value,
265    ) -> futures::future::BoxFuture<'a, ()> {
266        use futures::FutureExt as _;
267        use serde_json::Value::*;
268
269        async move {
270            let path = path.as_ref();
271
272            match tree {
273                Object(map) => {
274                    self.insert_dir(path).await.unwrap();
275                    for (name, contents) in map {
276                        let mut path = PathBuf::from(path);
277                        path.push(name);
278                        self.insert_tree(&path, contents).await;
279                    }
280                }
281                Null => {
282                    self.insert_dir(&path).await.unwrap();
283                }
284                String(contents) => {
285                    self.insert_file(&path, contents).await.unwrap();
286                }
287                _ => {
288                    panic!("JSON object must contain only objects, strings, or null");
289                }
290            }
291        }
292        .boxed()
293    }
294
295    pub async fn remove(&self, path: &Path) -> Result<()> {
296        let mut state = self.state.lock().await;
297        state.validate_path(path)?;
298        state.entries.retain(|path, _| !path.starts_with(path));
299        state.emit_event(&[path]).await;
300        Ok(())
301    }
302
303    pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
304        let mut state = self.state.lock().await;
305        state.validate_path(source)?;
306        state.validate_path(target)?;
307        if state.entries.contains_key(target) {
308            Err(anyhow!("target path already exists"))
309        } else {
310            let mut removed = Vec::new();
311            state.entries.retain(|path, entry| {
312                if let Ok(relative_path) = path.strip_prefix(source) {
313                    removed.push((relative_path.to_path_buf(), entry.clone()));
314                    false
315                } else {
316                    true
317                }
318            });
319
320            for (relative_path, entry) in removed {
321                let new_path = target.join(relative_path);
322                state.entries.insert(new_path, entry);
323            }
324
325            state.emit_event(&[source, target]).await;
326            Ok(())
327        }
328    }
329}
330
331#[cfg(any(test, feature = "test-support"))]
332#[async_trait::async_trait]
333impl Fs for FakeFs {
334    async fn load(&self, path: &Path) -> Result<String> {
335        self.executor.simulate_random_delay().await;
336        let state = self.state.lock().await;
337        let text = state
338            .entries
339            .get(path)
340            .and_then(|e| e.content.as_ref())
341            .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
342        Ok(text.clone())
343    }
344
345    async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
346        self.executor.simulate_random_delay().await;
347        let mut state = self.state.lock().await;
348        state.validate_path(path)?;
349        if let Some(entry) = state.entries.get_mut(path) {
350            if entry.metadata.is_dir {
351                Err(anyhow!("cannot overwrite a directory with a file"))
352            } else {
353                entry.content = Some(text.chunks().collect());
354                entry.metadata.mtime = SystemTime::now();
355                state.emit_event(&[path]).await;
356                Ok(())
357            }
358        } else {
359            let inode = state.next_inode;
360            state.next_inode += 1;
361            let entry = FakeFsEntry {
362                metadata: Metadata {
363                    inode,
364                    mtime: SystemTime::now(),
365                    is_dir: false,
366                    is_symlink: false,
367                },
368                content: Some(text.chunks().collect()),
369            };
370            state.entries.insert(path.to_path_buf(), entry);
371            state.emit_event(&[path]).await;
372            Ok(())
373        }
374    }
375
376    async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
377        self.executor.simulate_random_delay().await;
378        Ok(path.to_path_buf())
379    }
380
381    async fn is_file(&self, path: &Path) -> bool {
382        self.executor.simulate_random_delay().await;
383        let state = self.state.lock().await;
384        state
385            .entries
386            .get(path)
387            .map_or(false, |entry| !entry.metadata.is_dir)
388    }
389
390    async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
391        self.executor.simulate_random_delay().await;
392        let state = self.state.lock().await;
393        Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
394    }
395
396    async fn read_dir(
397        &self,
398        abs_path: &Path,
399    ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
400        use futures::{future, stream};
401        self.executor.simulate_random_delay().await;
402        let state = self.state.lock().await;
403        let abs_path = abs_path.to_path_buf();
404        Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
405            move |(child_path, _)| {
406                future::ready(if child_path.parent() == Some(&abs_path) {
407                    Some(Ok(child_path))
408                } else {
409                    None
410                })
411            },
412        )))
413    }
414
415    async fn watch(
416        &self,
417        path: &Path,
418        _: Duration,
419    ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
420        let state = self.state.lock().await;
421        self.executor.simulate_random_delay().await;
422        let rx = state.events_tx.subscribe();
423        let path = path.to_path_buf();
424        Box::pin(futures::StreamExt::filter(rx, move |events| {
425            let result = events.iter().any(|event| event.path.starts_with(&path));
426            async move { result }
427        }))
428    }
429
430    fn is_fake(&self) -> bool {
431        true
432    }
433
434    #[cfg(any(test, feature = "test-support"))]
435    fn as_fake(&self) -> &FakeFs {
436        self
437    }
438}