1use super::{char_bag::CharBag, char_bag_for_path, Entry, EntryKind, Rope};
2use anyhow::{anyhow, Context, Result};
3use atomic::Ordering::SeqCst;
4use fsevent::EventStream;
5use futures::{future::BoxFuture, Stream, StreamExt};
6use postage::prelude::Sink as _;
7use smol::io::{AsyncReadExt, AsyncWriteExt};
8use std::{
9 io,
10 os::unix::fs::MetadataExt,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{
14 atomic::{self, AtomicUsize},
15 Arc,
16 },
17 time::{Duration, SystemTime},
18};
19
20#[async_trait::async_trait]
21pub trait Fs: Send + Sync {
22 async fn entry(
23 &self,
24 root_char_bag: CharBag,
25 next_entry_id: &AtomicUsize,
26 path: Arc<Path>,
27 abs_path: &Path,
28 ) -> Result<Option<Entry>>;
29 async fn child_entries<'a>(
30 &self,
31 root_char_bag: CharBag,
32 next_entry_id: &'a AtomicUsize,
33 path: &'a Path,
34 abs_path: &'a Path,
35 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
36 async fn load(&self, path: &Path) -> Result<String>;
37 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
38 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
39 async fn is_file(&self, path: &Path) -> bool;
40 async fn watch(
41 &self,
42 path: &Path,
43 latency: Duration,
44 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
45 fn is_fake(&self) -> bool;
46}
47
48pub struct RealFs;
49
50#[async_trait::async_trait]
51impl Fs for RealFs {
52 async fn entry(
53 &self,
54 root_char_bag: CharBag,
55 next_entry_id: &AtomicUsize,
56 path: Arc<Path>,
57 abs_path: &Path,
58 ) -> Result<Option<Entry>> {
59 let metadata = match smol::fs::metadata(&abs_path).await {
60 Err(err) => {
61 return match (err.kind(), err.raw_os_error()) {
62 (io::ErrorKind::NotFound, _) => Ok(None),
63 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
64 _ => Err(anyhow::Error::new(err)),
65 }
66 }
67 Ok(metadata) => metadata,
68 };
69 let inode = metadata.ino();
70 let mtime = metadata.modified()?;
71 let is_symlink = smol::fs::symlink_metadata(&abs_path)
72 .await
73 .context("failed to read symlink metadata")?
74 .file_type()
75 .is_symlink();
76
77 let entry = Entry {
78 id: next_entry_id.fetch_add(1, SeqCst),
79 kind: if metadata.file_type().is_dir() {
80 EntryKind::PendingDir
81 } else {
82 EntryKind::File(char_bag_for_path(root_char_bag, &path))
83 },
84 path: Arc::from(path),
85 inode,
86 mtime,
87 is_symlink,
88 is_ignored: false,
89 };
90
91 Ok(Some(entry))
92 }
93
94 async fn child_entries<'a>(
95 &self,
96 root_char_bag: CharBag,
97 next_entry_id: &'a AtomicUsize,
98 path: &'a Path,
99 abs_path: &'a Path,
100 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
101 let entries = smol::fs::read_dir(abs_path).await?;
102 Ok(entries
103 .then(move |entry| async move {
104 let child_entry = entry?;
105 let child_name = child_entry.file_name();
106 let child_path: Arc<Path> = path.join(&child_name).into();
107 let child_abs_path = abs_path.join(&child_name);
108 let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
109 let child_metadata = smol::fs::metadata(child_abs_path).await?;
110 let child_inode = child_metadata.ino();
111 let child_mtime = child_metadata.modified()?;
112 Ok(Entry {
113 id: next_entry_id.fetch_add(1, SeqCst),
114 kind: if child_metadata.file_type().is_dir() {
115 EntryKind::PendingDir
116 } else {
117 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
118 },
119 path: child_path,
120 inode: child_inode,
121 mtime: child_mtime,
122 is_symlink: child_is_symlink,
123 is_ignored: false,
124 })
125 })
126 .boxed())
127 }
128
129 async fn load(&self, path: &Path) -> Result<String> {
130 let mut file = smol::fs::File::open(path).await?;
131 let mut text = String::new();
132 file.read_to_string(&mut text).await?;
133 Ok(text)
134 }
135
136 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
137 let buffer_size = text.summary().bytes.min(10 * 1024);
138 let file = smol::fs::File::create(path).await?;
139 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
140 for chunk in text.chunks() {
141 writer.write_all(chunk.as_bytes()).await?;
142 }
143 writer.flush().await?;
144 Ok(())
145 }
146
147 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
148 Ok(smol::fs::canonicalize(path).await?)
149 }
150
151 async fn is_file(&self, path: &Path) -> bool {
152 smol::fs::metadata(path)
153 .await
154 .map_or(false, |metadata| metadata.is_file())
155 }
156
157 async fn watch(
158 &self,
159 path: &Path,
160 latency: Duration,
161 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
162 let (mut tx, rx) = postage::mpsc::channel(64);
163 let (stream, handle) = EventStream::new(&[path], latency);
164 std::mem::forget(handle);
165 std::thread::spawn(move || {
166 stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
167 });
168 Box::pin(rx)
169 }
170
171 fn is_fake(&self) -> bool {
172 false
173 }
174}
175
176#[derive(Clone, Debug)]
177struct FakeFsEntry {
178 inode: u64,
179 mtime: SystemTime,
180 is_dir: bool,
181 is_symlink: bool,
182 content: Option<String>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186struct FakeFsState {
187 entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
188 next_inode: u64,
189 events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
190}
191
192#[cfg(any(test, feature = "test-support"))]
193impl FakeFsState {
194 fn validate_path(&self, path: &Path) -> Result<()> {
195 if path.is_absolute()
196 && path
197 .parent()
198 .and_then(|path| self.entries.get(path))
199 .map_or(false, |e| e.is_dir)
200 {
201 Ok(())
202 } else {
203 Err(anyhow!("invalid path {:?}", path))
204 }
205 }
206
207 async fn emit_event(&mut self, paths: &[&Path]) {
208 let events = paths
209 .iter()
210 .map(|path| fsevent::Event {
211 event_id: 0,
212 flags: fsevent::StreamFlags::empty(),
213 path: path.to_path_buf(),
214 })
215 .collect();
216
217 let _ = self.events_tx.send(events).await;
218 }
219}
220
221#[cfg(any(test, feature = "test-support"))]
222pub struct FakeFs {
223 // Use an unfair lock to ensure tests are deterministic.
224 state: futures::lock::Mutex<FakeFsState>,
225}
226
227#[cfg(any(test, feature = "test-support"))]
228impl FakeFs {
229 pub fn new() -> Self {
230 let (events_tx, _) = postage::broadcast::channel(2048);
231 let mut entries = std::collections::BTreeMap::new();
232 entries.insert(
233 Path::new("/").to_path_buf(),
234 FakeFsEntry {
235 inode: 0,
236 mtime: SystemTime::now(),
237 is_dir: true,
238 is_symlink: false,
239 content: None,
240 },
241 );
242 Self {
243 state: futures::lock::Mutex::new(FakeFsState {
244 entries,
245 next_inode: 1,
246 events_tx,
247 }),
248 }
249 }
250
251 pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
252 let mut state = self.state.lock().await;
253 let path = path.as_ref();
254 state.validate_path(path)?;
255
256 let inode = state.next_inode;
257 state.next_inode += 1;
258 state.entries.insert(
259 path.to_path_buf(),
260 FakeFsEntry {
261 inode,
262 mtime: SystemTime::now(),
263 is_dir: true,
264 is_symlink: false,
265 content: None,
266 },
267 );
268 state.emit_event(&[path]).await;
269 Ok(())
270 }
271
272 pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
273 let mut state = self.state.lock().await;
274 let path = path.as_ref();
275 state.validate_path(path)?;
276
277 let inode = state.next_inode;
278 state.next_inode += 1;
279 state.entries.insert(
280 path.to_path_buf(),
281 FakeFsEntry {
282 inode,
283 mtime: SystemTime::now(),
284 is_dir: false,
285 is_symlink: false,
286 content: Some(content),
287 },
288 );
289 state.emit_event(&[path]).await;
290 Ok(())
291 }
292
293 #[must_use]
294 pub fn insert_tree<'a>(
295 &'a self,
296 path: impl 'a + AsRef<Path> + Send,
297 tree: serde_json::Value,
298 ) -> BoxFuture<'a, ()> {
299 use futures::FutureExt as _;
300 use serde_json::Value::*;
301
302 async move {
303 let path = path.as_ref();
304
305 match tree {
306 Object(map) => {
307 self.insert_dir(path).await.unwrap();
308 for (name, contents) in map {
309 let mut path = PathBuf::from(path);
310 path.push(name);
311 self.insert_tree(&path, contents).await;
312 }
313 }
314 Null => {
315 self.insert_dir(&path).await.unwrap();
316 }
317 String(contents) => {
318 self.insert_file(&path, contents).await.unwrap();
319 }
320 _ => {
321 panic!("JSON object must contain only objects, strings, or null");
322 }
323 }
324 }
325 .boxed()
326 }
327
328 pub async fn remove(&self, path: &Path) -> Result<()> {
329 let mut state = self.state.lock().await;
330 state.validate_path(path)?;
331 state.entries.retain(|path, _| !path.starts_with(path));
332 state.emit_event(&[path]).await;
333 Ok(())
334 }
335
336 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
337 let mut state = self.state.lock().await;
338 state.validate_path(source)?;
339 state.validate_path(target)?;
340 if state.entries.contains_key(target) {
341 Err(anyhow!("target path already exists"))
342 } else {
343 let mut removed = Vec::new();
344 state.entries.retain(|path, entry| {
345 if let Ok(relative_path) = path.strip_prefix(source) {
346 removed.push((relative_path.to_path_buf(), entry.clone()));
347 false
348 } else {
349 true
350 }
351 });
352
353 for (relative_path, entry) in removed {
354 let new_path = target.join(relative_path);
355 state.entries.insert(new_path, entry);
356 }
357
358 state.emit_event(&[source, target]).await;
359 Ok(())
360 }
361 }
362}
363
364#[cfg(any(test, feature = "test-support"))]
365#[async_trait::async_trait]
366impl Fs for FakeFs {
367 async fn entry(
368 &self,
369 root_char_bag: CharBag,
370 next_entry_id: &AtomicUsize,
371 path: Arc<Path>,
372 abs_path: &Path,
373 ) -> Result<Option<Entry>> {
374 let state = self.state.lock().await;
375 if let Some(entry) = state.entries.get(abs_path) {
376 Ok(Some(Entry {
377 id: next_entry_id.fetch_add(1, SeqCst),
378 kind: if entry.is_dir {
379 EntryKind::PendingDir
380 } else {
381 EntryKind::File(char_bag_for_path(root_char_bag, &path))
382 },
383 path: Arc::from(path),
384 inode: entry.inode,
385 mtime: entry.mtime,
386 is_symlink: entry.is_symlink,
387 is_ignored: false,
388 }))
389 } else {
390 Ok(None)
391 }
392 }
393
394 async fn child_entries<'a>(
395 &self,
396 root_char_bag: CharBag,
397 next_entry_id: &'a AtomicUsize,
398 path: &'a Path,
399 abs_path: &'a Path,
400 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
401 use futures::{future, stream};
402
403 let state = self.state.lock().await;
404 Ok(stream::iter(state.entries.clone())
405 .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
406 .then(move |(child_abs_path, child_entry)| async move {
407 smol::future::yield_now().await;
408 let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
409 Ok(Entry {
410 id: next_entry_id.fetch_add(1, SeqCst),
411 kind: if child_entry.is_dir {
412 EntryKind::PendingDir
413 } else {
414 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
415 },
416 path: child_path,
417 inode: child_entry.inode,
418 mtime: child_entry.mtime,
419 is_symlink: child_entry.is_symlink,
420 is_ignored: false,
421 })
422 })
423 .boxed())
424 }
425
426 async fn load(&self, path: &Path) -> Result<String> {
427 let state = self.state.lock().await;
428 let text = state
429 .entries
430 .get(path)
431 .and_then(|e| e.content.as_ref())
432 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
433 Ok(text.clone())
434 }
435
436 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
437 let mut state = self.state.lock().await;
438 state.validate_path(path)?;
439 if let Some(entry) = state.entries.get_mut(path) {
440 if entry.is_dir {
441 Err(anyhow!("cannot overwrite a directory with a file"))
442 } else {
443 entry.content = Some(text.chunks().collect());
444 entry.mtime = SystemTime::now();
445 state.emit_event(&[path]).await;
446 Ok(())
447 }
448 } else {
449 let inode = state.next_inode;
450 state.next_inode += 1;
451 let entry = FakeFsEntry {
452 inode,
453 mtime: SystemTime::now(),
454 is_dir: false,
455 is_symlink: false,
456 content: Some(text.chunks().collect()),
457 };
458 state.entries.insert(path.to_path_buf(), entry);
459 state.emit_event(&[path]).await;
460 Ok(())
461 }
462 }
463
464 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
465 Ok(path.to_path_buf())
466 }
467
468 async fn is_file(&self, path: &Path) -> bool {
469 let state = self.state.lock().await;
470 state.entries.get(path).map_or(false, |entry| !entry.is_dir)
471 }
472
473 async fn watch(
474 &self,
475 path: &Path,
476 _: Duration,
477 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
478 let state = self.state.lock().await;
479 let rx = state.events_tx.subscribe();
480 let path = path.to_path_buf();
481 Box::pin(futures::StreamExt::filter(rx, move |events| {
482 let result = events.iter().any(|event| event.path.starts_with(&path));
483 async move { result }
484 }))
485 }
486
487 fn is_fake(&self) -> bool {
488 true
489 }
490}