1use super::{char_bag::CharBag, char_bag_for_path, Entry, EntryKind, Rope};
2use anyhow::{anyhow, Context, Result};
3use atomic::Ordering::SeqCst;
4use fsevent::EventStream;
5use futures::{future::BoxFuture, Stream, StreamExt};
6use postage::prelude::Sink as _;
7use smol::io::{AsyncReadExt, AsyncWriteExt};
8use std::{
9 io,
10 os::unix::fs::MetadataExt,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{
14 atomic::{self, AtomicUsize},
15 Arc,
16 },
17 time::{Duration, SystemTime},
18};
19
20#[async_trait::async_trait]
21pub trait Fs: Send + Sync {
22 async fn entry(
23 &self,
24 root_char_bag: CharBag,
25 next_entry_id: &AtomicUsize,
26 path: Arc<Path>,
27 abs_path: &Path,
28 ) -> Result<Option<Entry>>;
29 async fn child_entries<'a>(
30 &self,
31 root_char_bag: CharBag,
32 next_entry_id: &'a AtomicUsize,
33 path: &'a Path,
34 abs_path: &'a Path,
35 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
36 async fn load(&self, path: &Path) -> Result<String>;
37 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
38 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
39 async fn is_file(&self, path: &Path) -> bool;
40 async fn watch(
41 &self,
42 path: &Path,
43 latency: Duration,
44 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
45 fn is_fake(&self) -> bool;
46}
47
48pub struct RealFs;
49
50#[async_trait::async_trait]
51impl Fs for RealFs {
52 async fn entry(
53 &self,
54 root_char_bag: CharBag,
55 next_entry_id: &AtomicUsize,
56 path: Arc<Path>,
57 abs_path: &Path,
58 ) -> Result<Option<Entry>> {
59 let metadata = match smol::fs::metadata(&abs_path).await {
60 Err(err) => {
61 return match (err.kind(), err.raw_os_error()) {
62 (io::ErrorKind::NotFound, _) => Ok(None),
63 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
64 _ => Err(anyhow::Error::new(err)),
65 }
66 }
67 Ok(metadata) => metadata,
68 };
69 let inode = metadata.ino();
70 let mtime = metadata.modified()?;
71 let is_symlink = smol::fs::symlink_metadata(&abs_path)
72 .await
73 .context("failed to read symlink metadata")?
74 .file_type()
75 .is_symlink();
76
77 let entry = Entry {
78 id: next_entry_id.fetch_add(1, SeqCst),
79 kind: if metadata.file_type().is_dir() {
80 EntryKind::PendingDir
81 } else {
82 EntryKind::File(char_bag_for_path(root_char_bag, &path))
83 },
84 path: Arc::from(path),
85 inode,
86 mtime,
87 is_symlink,
88 is_ignored: false,
89 };
90
91 Ok(Some(entry))
92 }
93
94 async fn child_entries<'a>(
95 &self,
96 root_char_bag: CharBag,
97 next_entry_id: &'a AtomicUsize,
98 path: &'a Path,
99 abs_path: &'a Path,
100 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
101 let entries = smol::fs::read_dir(abs_path).await?;
102 Ok(entries
103 .then(move |entry| async move {
104 let child_entry = entry?;
105 let child_name = child_entry.file_name();
106 let child_path: Arc<Path> = path.join(&child_name).into();
107 let child_abs_path = abs_path.join(&child_name);
108 let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
109 let child_metadata = smol::fs::metadata(child_abs_path).await?;
110 let child_inode = child_metadata.ino();
111 let child_mtime = child_metadata.modified()?;
112 Ok(Entry {
113 id: next_entry_id.fetch_add(1, SeqCst),
114 kind: if child_metadata.file_type().is_dir() {
115 EntryKind::PendingDir
116 } else {
117 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
118 },
119 path: child_path,
120 inode: child_inode,
121 mtime: child_mtime,
122 is_symlink: child_is_symlink,
123 is_ignored: false,
124 })
125 })
126 .boxed())
127 }
128
129 async fn load(&self, path: &Path) -> Result<String> {
130 let mut file = smol::fs::File::open(path).await?;
131 let mut text = String::new();
132 file.read_to_string(&mut text).await?;
133 Ok(text)
134 }
135
136 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
137 let buffer_size = text.summary().bytes.min(10 * 1024);
138 let file = smol::fs::File::create(path).await?;
139 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
140 for chunk in text.chunks() {
141 writer.write_all(chunk.as_bytes()).await?;
142 }
143 writer.flush().await?;
144 Ok(())
145 }
146
147 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
148 Ok(smol::fs::canonicalize(path).await?)
149 }
150
151 async fn is_file(&self, path: &Path) -> bool {
152 smol::fs::metadata(path)
153 .await
154 .map_or(false, |metadata| metadata.is_file())
155 }
156
157 async fn watch(
158 &self,
159 path: &Path,
160 latency: Duration,
161 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
162 let (mut tx, rx) = postage::mpsc::channel(64);
163 let (stream, handle) = EventStream::new(&[path], latency);
164 std::mem::forget(handle);
165 std::thread::spawn(move || {
166 stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
167 });
168 Box::pin(rx)
169 }
170
171 fn is_fake(&self) -> bool {
172 false
173 }
174}
175
176#[derive(Clone, Debug)]
177struct FakeFsEntry {
178 inode: u64,
179 mtime: SystemTime,
180 is_dir: bool,
181 is_symlink: bool,
182 content: Option<String>,
183}
184
185#[cfg(any(test, feature = "test-support"))]
186struct FakeFsState {
187 entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
188 next_inode: u64,
189 events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
190}
191
192#[cfg(any(test, feature = "test-support"))]
193impl FakeFsState {
194 fn validate_path(&self, path: &Path) -> Result<()> {
195 if path.is_absolute()
196 && path
197 .parent()
198 .and_then(|path| self.entries.get(path))
199 .map_or(false, |e| e.is_dir)
200 {
201 Ok(())
202 } else {
203 Err(anyhow!("invalid path {:?}", path))
204 }
205 }
206
207 async fn emit_event(&mut self, paths: &[&Path]) {
208 let events = paths
209 .iter()
210 .map(|path| fsevent::Event {
211 event_id: 0,
212 flags: fsevent::StreamFlags::empty(),
213 path: path.to_path_buf(),
214 })
215 .collect();
216
217 let _ = self.events_tx.send(events).await;
218 }
219}
220
221#[cfg(any(test, feature = "test-support"))]
222pub struct FakeFs {
223 // Use an unfair lock to ensure tests are deterministic.
224 state: futures::lock::Mutex<FakeFsState>,
225}
226
227#[cfg(any(test, feature = "test-support"))]
228impl FakeFs {
229 pub fn new() -> Self {
230 let (events_tx, _) = postage::broadcast::channel(2048);
231 let mut entries = std::collections::BTreeMap::new();
232 entries.insert(
233 Path::new("/").to_path_buf(),
234 FakeFsEntry {
235 inode: 0,
236 mtime: SystemTime::now(),
237 is_dir: true,
238 is_symlink: false,
239 content: None,
240 },
241 );
242 Self {
243 state: futures::lock::Mutex::new(FakeFsState {
244 entries,
245 next_inode: 1,
246 events_tx,
247 }),
248 }
249 }
250
251 pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
252 let mut state = self.state.lock().await;
253 let path = path.as_ref();
254 state.validate_path(path)?;
255
256 let inode = state.next_inode;
257 state.next_inode += 1;
258 state.entries.insert(
259 path.to_path_buf(),
260 FakeFsEntry {
261 inode,
262 mtime: SystemTime::now(),
263 is_dir: true,
264 is_symlink: false,
265 content: None,
266 },
267 );
268 state.emit_event(&[path]).await;
269 Ok(())
270 }
271
272 pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
273 let mut state = self.state.lock().await;
274 let path = path.as_ref();
275 state.validate_path(path)?;
276
277 let inode = state.next_inode;
278 state.next_inode += 1;
279 state.entries.insert(
280 path.to_path_buf(),
281 FakeFsEntry {
282 inode,
283 mtime: SystemTime::now(),
284 is_dir: false,
285 is_symlink: false,
286 content: Some(content),
287 },
288 );
289 state.emit_event(&[path]).await;
290 Ok(())
291 }
292
293 pub fn insert_tree<'a>(
294 &'a self,
295 path: impl 'a + AsRef<Path> + Send,
296 tree: serde_json::Value,
297 ) -> BoxFuture<'a, ()> {
298 use futures::FutureExt as _;
299 use serde_json::Value::*;
300
301 async move {
302 let path = path.as_ref();
303
304 match tree {
305 Object(map) => {
306 self.insert_dir(path).await.unwrap();
307 for (name, contents) in map {
308 let mut path = PathBuf::from(path);
309 path.push(name);
310 self.insert_tree(&path, contents).await;
311 }
312 }
313 Null => {
314 self.insert_dir(&path).await.unwrap();
315 }
316 String(contents) => {
317 self.insert_file(&path, contents).await.unwrap();
318 }
319 _ => {
320 panic!("JSON object must contain only objects, strings, or null");
321 }
322 }
323 }
324 .boxed()
325 }
326
327 pub async fn remove(&self, path: &Path) -> Result<()> {
328 let mut state = self.state.lock().await;
329 state.validate_path(path)?;
330 state.entries.retain(|path, _| !path.starts_with(path));
331 state.emit_event(&[path]).await;
332 Ok(())
333 }
334
335 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
336 let mut state = self.state.lock().await;
337 state.validate_path(source)?;
338 state.validate_path(target)?;
339 if state.entries.contains_key(target) {
340 Err(anyhow!("target path already exists"))
341 } else {
342 let mut removed = Vec::new();
343 state.entries.retain(|path, entry| {
344 if let Ok(relative_path) = path.strip_prefix(source) {
345 removed.push((relative_path.to_path_buf(), entry.clone()));
346 false
347 } else {
348 true
349 }
350 });
351
352 for (relative_path, entry) in removed {
353 let new_path = target.join(relative_path);
354 state.entries.insert(new_path, entry);
355 }
356
357 state.emit_event(&[source, target]).await;
358 Ok(())
359 }
360 }
361}
362
363#[cfg(any(test, feature = "test-support"))]
364#[async_trait::async_trait]
365impl Fs for FakeFs {
366 async fn entry(
367 &self,
368 root_char_bag: CharBag,
369 next_entry_id: &AtomicUsize,
370 path: Arc<Path>,
371 abs_path: &Path,
372 ) -> Result<Option<Entry>> {
373 let state = self.state.lock().await;
374 if let Some(entry) = state.entries.get(abs_path) {
375 Ok(Some(Entry {
376 id: next_entry_id.fetch_add(1, SeqCst),
377 kind: if entry.is_dir {
378 EntryKind::PendingDir
379 } else {
380 EntryKind::File(char_bag_for_path(root_char_bag, &path))
381 },
382 path: Arc::from(path),
383 inode: entry.inode,
384 mtime: entry.mtime,
385 is_symlink: entry.is_symlink,
386 is_ignored: false,
387 }))
388 } else {
389 Ok(None)
390 }
391 }
392
393 async fn child_entries<'a>(
394 &self,
395 root_char_bag: CharBag,
396 next_entry_id: &'a AtomicUsize,
397 path: &'a Path,
398 abs_path: &'a Path,
399 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
400 use futures::{future, stream};
401
402 let state = self.state.lock().await;
403 Ok(stream::iter(state.entries.clone())
404 .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
405 .then(move |(child_abs_path, child_entry)| async move {
406 smol::future::yield_now().await;
407 let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
408 Ok(Entry {
409 id: next_entry_id.fetch_add(1, SeqCst),
410 kind: if child_entry.is_dir {
411 EntryKind::PendingDir
412 } else {
413 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
414 },
415 path: child_path,
416 inode: child_entry.inode,
417 mtime: child_entry.mtime,
418 is_symlink: child_entry.is_symlink,
419 is_ignored: false,
420 })
421 })
422 .boxed())
423 }
424
425 async fn load(&self, path: &Path) -> Result<String> {
426 let state = self.state.lock().await;
427 let text = state
428 .entries
429 .get(path)
430 .and_then(|e| e.content.as_ref())
431 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
432 Ok(text.clone())
433 }
434
435 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
436 let mut state = self.state.lock().await;
437 state.validate_path(path)?;
438 if let Some(entry) = state.entries.get_mut(path) {
439 if entry.is_dir {
440 Err(anyhow!("cannot overwrite a directory with a file"))
441 } else {
442 entry.content = Some(text.chunks().collect());
443 entry.mtime = SystemTime::now();
444 state.emit_event(&[path]).await;
445 Ok(())
446 }
447 } else {
448 let inode = state.next_inode;
449 state.next_inode += 1;
450 let entry = FakeFsEntry {
451 inode,
452 mtime: SystemTime::now(),
453 is_dir: false,
454 is_symlink: false,
455 content: Some(text.chunks().collect()),
456 };
457 state.entries.insert(path.to_path_buf(), entry);
458 state.emit_event(&[path]).await;
459 Ok(())
460 }
461 }
462
463 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
464 Ok(path.to_path_buf())
465 }
466
467 async fn is_file(&self, path: &Path) -> bool {
468 let state = self.state.lock().await;
469 state.entries.get(path).map_or(false, |entry| !entry.is_dir)
470 }
471
472 async fn watch(
473 &self,
474 path: &Path,
475 _: Duration,
476 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
477 let state = self.state.lock().await;
478 let rx = state.events_tx.subscribe();
479 let path = path.to_path_buf();
480 Box::pin(futures::StreamExt::filter(rx, move |events| {
481 let result = events.iter().any(|event| event.path.starts_with(&path));
482 async move { result }
483 }))
484 }
485
486 fn is_fake(&self) -> bool {
487 true
488 }
489}