1use anyhow::{anyhow, Result};
2use buffer::Rope;
3use fsevent::EventStream;
4use futures::{Stream, StreamExt};
5use postage::prelude::Sink as _;
6use smol::io::{AsyncReadExt, AsyncWriteExt};
7use std::{
8 io,
9 os::unix::fs::MetadataExt,
10 path::{Path, PathBuf},
11 pin::Pin,
12 time::{Duration, SystemTime},
13};
14
15#[async_trait::async_trait]
16pub trait Fs: Send + Sync {
17 async fn load(&self, path: &Path) -> Result<String>;
18 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
19 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
20 async fn is_file(&self, path: &Path) -> bool;
21 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
22 async fn read_dir(
23 &self,
24 path: &Path,
25 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
26 async fn watch(
27 &self,
28 path: &Path,
29 latency: Duration,
30 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
31 fn is_fake(&self) -> bool;
32 #[cfg(any(test, feature = "test-support"))]
33 fn as_fake(&self) -> &FakeFs;
34}
35
36#[derive(Clone, Debug)]
37pub struct Metadata {
38 pub inode: u64,
39 pub mtime: SystemTime,
40 pub is_symlink: bool,
41 pub is_dir: bool,
42}
43
44pub struct RealFs;
45
46#[async_trait::async_trait]
47impl Fs for RealFs {
48 async fn load(&self, path: &Path) -> Result<String> {
49 let mut file = smol::fs::File::open(path).await?;
50 let mut text = String::new();
51 file.read_to_string(&mut text).await?;
52 Ok(text)
53 }
54
55 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
56 let buffer_size = text.summary().bytes.min(10 * 1024);
57 let file = smol::fs::File::create(path).await?;
58 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
59 for chunk in text.chunks() {
60 writer.write_all(chunk.as_bytes()).await?;
61 }
62 writer.flush().await?;
63 Ok(())
64 }
65
66 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
67 Ok(smol::fs::canonicalize(path).await?)
68 }
69
70 async fn is_file(&self, path: &Path) -> bool {
71 smol::fs::metadata(path)
72 .await
73 .map_or(false, |metadata| metadata.is_file())
74 }
75
76 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
77 let symlink_metadata = match smol::fs::symlink_metadata(path).await {
78 Ok(metadata) => metadata,
79 Err(err) => {
80 return match (err.kind(), err.raw_os_error()) {
81 (io::ErrorKind::NotFound, _) => Ok(None),
82 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
83 _ => Err(anyhow::Error::new(err)),
84 }
85 }
86 };
87
88 let is_symlink = symlink_metadata.file_type().is_symlink();
89 let metadata = if is_symlink {
90 smol::fs::metadata(path).await?
91 } else {
92 symlink_metadata
93 };
94 Ok(Some(Metadata {
95 inode: metadata.ino(),
96 mtime: metadata.modified().unwrap(),
97 is_symlink,
98 is_dir: metadata.file_type().is_dir(),
99 }))
100 }
101
102 async fn read_dir(
103 &self,
104 path: &Path,
105 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
106 let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
107 Ok(entry) => Ok(entry.path()),
108 Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
109 });
110 Ok(Box::pin(result))
111 }
112
113 async fn watch(
114 &self,
115 path: &Path,
116 latency: Duration,
117 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
118 let (mut tx, rx) = postage::mpsc::channel(64);
119 let (stream, handle) = EventStream::new(&[path], latency);
120 std::mem::forget(handle);
121 std::thread::spawn(move || {
122 stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
123 });
124 Box::pin(rx)
125 }
126
127 fn is_fake(&self) -> bool {
128 false
129 }
130
131 #[cfg(any(test, feature = "test-support"))]
132 fn as_fake(&self) -> &FakeFs {
133 panic!("called `RealFs::as_fake`")
134 }
135}
136
137#[derive(Clone, Debug)]
138struct FakeFsEntry {
139 metadata: Metadata,
140 content: Option<String>,
141}
142
143#[cfg(any(test, feature = "test-support"))]
144struct FakeFsState {
145 entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
146 next_inode: u64,
147 events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
148}
149
150#[cfg(any(test, feature = "test-support"))]
151impl FakeFsState {
152 fn validate_path(&self, path: &Path) -> Result<()> {
153 if path.is_absolute()
154 && path
155 .parent()
156 .and_then(|path| self.entries.get(path))
157 .map_or(false, |e| e.metadata.is_dir)
158 {
159 Ok(())
160 } else {
161 Err(anyhow!("invalid path {:?}", path))
162 }
163 }
164
165 async fn emit_event(&mut self, paths: &[&Path]) {
166 let events = paths
167 .iter()
168 .map(|path| fsevent::Event {
169 event_id: 0,
170 flags: fsevent::StreamFlags::empty(),
171 path: path.to_path_buf(),
172 })
173 .collect();
174
175 let _ = self.events_tx.send(events).await;
176 }
177}
178
179#[cfg(any(test, feature = "test-support"))]
180pub struct FakeFs {
181 // Use an unfair lock to ensure tests are deterministic.
182 state: futures::lock::Mutex<FakeFsState>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186impl FakeFs {
187 pub fn new() -> Self {
188 let (events_tx, _) = postage::broadcast::channel(2048);
189 let mut entries = std::collections::BTreeMap::new();
190 entries.insert(
191 Path::new("/").to_path_buf(),
192 FakeFsEntry {
193 metadata: Metadata {
194 inode: 0,
195 mtime: SystemTime::now(),
196 is_dir: true,
197 is_symlink: false,
198 },
199 content: None,
200 },
201 );
202 Self {
203 state: futures::lock::Mutex::new(FakeFsState {
204 entries,
205 next_inode: 1,
206 events_tx,
207 }),
208 }
209 }
210
211 pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
212 let mut state = self.state.lock().await;
213 let path = path.as_ref();
214 state.validate_path(path)?;
215
216 let inode = state.next_inode;
217 state.next_inode += 1;
218 state.entries.insert(
219 path.to_path_buf(),
220 FakeFsEntry {
221 metadata: Metadata {
222 inode,
223 mtime: SystemTime::now(),
224 is_dir: true,
225 is_symlink: false,
226 },
227 content: None,
228 },
229 );
230 state.emit_event(&[path]).await;
231 Ok(())
232 }
233
234 pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
235 let mut state = self.state.lock().await;
236 let path = path.as_ref();
237 state.validate_path(path)?;
238
239 let inode = state.next_inode;
240 state.next_inode += 1;
241 state.entries.insert(
242 path.to_path_buf(),
243 FakeFsEntry {
244 metadata: Metadata {
245 inode,
246 mtime: SystemTime::now(),
247 is_dir: false,
248 is_symlink: false,
249 },
250 content: Some(content),
251 },
252 );
253 state.emit_event(&[path]).await;
254 Ok(())
255 }
256
257 #[must_use]
258 pub fn insert_tree<'a>(
259 &'a self,
260 path: impl 'a + AsRef<Path> + Send,
261 tree: serde_json::Value,
262 ) -> futures::future::BoxFuture<'a, ()> {
263 use futures::FutureExt as _;
264 use serde_json::Value::*;
265
266 async move {
267 let path = path.as_ref();
268
269 match tree {
270 Object(map) => {
271 self.insert_dir(path).await.unwrap();
272 for (name, contents) in map {
273 let mut path = PathBuf::from(path);
274 path.push(name);
275 self.insert_tree(&path, contents).await;
276 }
277 }
278 Null => {
279 self.insert_dir(&path).await.unwrap();
280 }
281 String(contents) => {
282 self.insert_file(&path, contents).await.unwrap();
283 }
284 _ => {
285 panic!("JSON object must contain only objects, strings, or null");
286 }
287 }
288 }
289 .boxed()
290 }
291
292 pub async fn remove(&self, path: &Path) -> Result<()> {
293 let mut state = self.state.lock().await;
294 state.validate_path(path)?;
295 state.entries.retain(|path, _| !path.starts_with(path));
296 state.emit_event(&[path]).await;
297 Ok(())
298 }
299
300 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
301 let mut state = self.state.lock().await;
302 state.validate_path(source)?;
303 state.validate_path(target)?;
304 if state.entries.contains_key(target) {
305 Err(anyhow!("target path already exists"))
306 } else {
307 let mut removed = Vec::new();
308 state.entries.retain(|path, entry| {
309 if let Ok(relative_path) = path.strip_prefix(source) {
310 removed.push((relative_path.to_path_buf(), entry.clone()));
311 false
312 } else {
313 true
314 }
315 });
316
317 for (relative_path, entry) in removed {
318 let new_path = target.join(relative_path);
319 state.entries.insert(new_path, entry);
320 }
321
322 state.emit_event(&[source, target]).await;
323 Ok(())
324 }
325 }
326}
327
328#[cfg(any(test, feature = "test-support"))]
329#[async_trait::async_trait]
330impl Fs for FakeFs {
331 async fn load(&self, path: &Path) -> Result<String> {
332 let state = self.state.lock().await;
333 let text = state
334 .entries
335 .get(path)
336 .and_then(|e| e.content.as_ref())
337 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
338 Ok(text.clone())
339 }
340
341 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
342 let mut state = self.state.lock().await;
343 state.validate_path(path)?;
344 if let Some(entry) = state.entries.get_mut(path) {
345 if entry.metadata.is_dir {
346 Err(anyhow!("cannot overwrite a directory with a file"))
347 } else {
348 entry.content = Some(text.chunks().collect());
349 entry.metadata.mtime = SystemTime::now();
350 state.emit_event(&[path]).await;
351 Ok(())
352 }
353 } else {
354 let inode = state.next_inode;
355 state.next_inode += 1;
356 let entry = FakeFsEntry {
357 metadata: Metadata {
358 inode,
359 mtime: SystemTime::now(),
360 is_dir: false,
361 is_symlink: false,
362 },
363 content: Some(text.chunks().collect()),
364 };
365 state.entries.insert(path.to_path_buf(), entry);
366 state.emit_event(&[path]).await;
367 Ok(())
368 }
369 }
370
371 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
372 Ok(path.to_path_buf())
373 }
374
375 async fn is_file(&self, path: &Path) -> bool {
376 let state = self.state.lock().await;
377 state
378 .entries
379 .get(path)
380 .map_or(false, |entry| !entry.metadata.is_dir)
381 }
382
383 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
384 let state = self.state.lock().await;
385 Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
386 }
387
388 async fn read_dir(
389 &self,
390 abs_path: &Path,
391 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
392 use futures::{future, stream};
393 let state = self.state.lock().await;
394 let abs_path = abs_path.to_path_buf();
395 Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
396 move |(child_path, _)| {
397 future::ready(if child_path.parent() == Some(&abs_path) {
398 Some(Ok(child_path))
399 } else {
400 None
401 })
402 },
403 )))
404 }
405
406 async fn watch(
407 &self,
408 path: &Path,
409 _: Duration,
410 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
411 let state = self.state.lock().await;
412 let rx = state.events_tx.subscribe();
413 let path = path.to_path_buf();
414 Box::pin(futures::StreamExt::filter(rx, move |events| {
415 let result = events.iter().any(|event| event.path.starts_with(&path));
416 async move { result }
417 }))
418 }
419
420 fn is_fake(&self) -> bool {
421 true
422 }
423
424 #[cfg(any(test, feature = "test-support"))]
425 fn as_fake(&self) -> &FakeFs {
426 self
427 }
428}