1use anyhow::{anyhow, Result};
2use fsevent::EventStream;
3use futures::{Stream, StreamExt};
4use postage::prelude::Sink as _;
5use smol::io::{AsyncReadExt, AsyncWriteExt};
6use std::{
7 io,
8 os::unix::fs::MetadataExt,
9 path::{Path, PathBuf},
10 pin::Pin,
11 time::{Duration, SystemTime},
12};
13use text::Rope;
14
15#[async_trait::async_trait]
16pub trait Fs: Send + Sync {
17 async fn load(&self, path: &Path) -> Result<String>;
18 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
19 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
20 async fn is_file(&self, path: &Path) -> bool;
21 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
22 async fn read_dir(
23 &self,
24 path: &Path,
25 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
26 async fn watch(
27 &self,
28 path: &Path,
29 latency: Duration,
30 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
31 fn is_fake(&self) -> bool;
32 #[cfg(any(test, feature = "test-support"))]
33 fn as_fake(&self) -> &FakeFs;
34}
35
36#[derive(Clone, Debug)]
37pub struct Metadata {
38 pub inode: u64,
39 pub mtime: SystemTime,
40 pub is_symlink: bool,
41 pub is_dir: bool,
42}
43
44pub struct RealFs;
45
46#[async_trait::async_trait]
47impl Fs for RealFs {
48 async fn load(&self, path: &Path) -> Result<String> {
49 let mut file = smol::fs::File::open(path).await?;
50 let mut text = String::new();
51 file.read_to_string(&mut text).await?;
52 Ok(text)
53 }
54
55 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
56 let buffer_size = text.summary().bytes.min(10 * 1024);
57 let file = smol::fs::File::create(path).await?;
58 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
59 for chunk in text.chunks() {
60 writer.write_all(chunk.as_bytes()).await?;
61 }
62 writer.flush().await?;
63 Ok(())
64 }
65
66 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
67 Ok(smol::fs::canonicalize(path).await?)
68 }
69
70 async fn is_file(&self, path: &Path) -> bool {
71 smol::fs::metadata(path)
72 .await
73 .map_or(false, |metadata| metadata.is_file())
74 }
75
76 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
77 let symlink_metadata = match smol::fs::symlink_metadata(path).await {
78 Ok(metadata) => metadata,
79 Err(err) => {
80 return match (err.kind(), err.raw_os_error()) {
81 (io::ErrorKind::NotFound, _) => Ok(None),
82 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
83 _ => Err(anyhow::Error::new(err)),
84 }
85 }
86 };
87
88 let is_symlink = symlink_metadata.file_type().is_symlink();
89 let metadata = if is_symlink {
90 smol::fs::metadata(path).await?
91 } else {
92 symlink_metadata
93 };
94 Ok(Some(Metadata {
95 inode: metadata.ino(),
96 mtime: metadata.modified().unwrap(),
97 is_symlink,
98 is_dir: metadata.file_type().is_dir(),
99 }))
100 }
101
102 async fn read_dir(
103 &self,
104 path: &Path,
105 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
106 let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
107 Ok(entry) => Ok(entry.path()),
108 Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
109 });
110 Ok(Box::pin(result))
111 }
112
113 async fn watch(
114 &self,
115 path: &Path,
116 latency: Duration,
117 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
118 let (mut tx, rx) = postage::mpsc::channel(64);
119 let (stream, handle) = EventStream::new(&[path], latency);
120 std::mem::forget(handle);
121 std::thread::spawn(move || {
122 stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
123 });
124 Box::pin(rx)
125 }
126
127 fn is_fake(&self) -> bool {
128 false
129 }
130
131 #[cfg(any(test, feature = "test-support"))]
132 fn as_fake(&self) -> &FakeFs {
133 panic!("called `RealFs::as_fake`")
134 }
135}
136
137#[cfg(any(test, feature = "test-support"))]
138#[derive(Clone, Debug)]
139struct FakeFsEntry {
140 metadata: Metadata,
141 content: Option<String>,
142}
143
144#[cfg(any(test, feature = "test-support"))]
145struct FakeFsState {
146 entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
147 next_inode: u64,
148 events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
149}
150
151#[cfg(any(test, feature = "test-support"))]
152impl FakeFsState {
153 fn validate_path(&self, path: &Path) -> Result<()> {
154 if path.is_absolute()
155 && path
156 .parent()
157 .and_then(|path| self.entries.get(path))
158 .map_or(false, |e| e.metadata.is_dir)
159 {
160 Ok(())
161 } else {
162 Err(anyhow!("invalid path {:?}", path))
163 }
164 }
165
166 async fn emit_event(&mut self, paths: &[&Path]) {
167 let events = paths
168 .iter()
169 .map(|path| fsevent::Event {
170 event_id: 0,
171 flags: fsevent::StreamFlags::empty(),
172 path: path.to_path_buf(),
173 })
174 .collect();
175
176 let _ = self.events_tx.send(events).await;
177 }
178}
179
180#[cfg(any(test, feature = "test-support"))]
181pub struct FakeFs {
182 // Use an unfair lock to ensure tests are deterministic.
183 state: futures::lock::Mutex<FakeFsState>,
184}
185
186#[cfg(any(test, feature = "test-support"))]
187impl FakeFs {
188 pub fn new() -> Self {
189 let (events_tx, _) = postage::broadcast::channel(2048);
190 let mut entries = std::collections::BTreeMap::new();
191 entries.insert(
192 Path::new("/").to_path_buf(),
193 FakeFsEntry {
194 metadata: Metadata {
195 inode: 0,
196 mtime: SystemTime::now(),
197 is_dir: true,
198 is_symlink: false,
199 },
200 content: None,
201 },
202 );
203 Self {
204 state: futures::lock::Mutex::new(FakeFsState {
205 entries,
206 next_inode: 1,
207 events_tx,
208 }),
209 }
210 }
211
212 pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
213 let mut state = self.state.lock().await;
214 let path = path.as_ref();
215 state.validate_path(path)?;
216
217 let inode = state.next_inode;
218 state.next_inode += 1;
219 state.entries.insert(
220 path.to_path_buf(),
221 FakeFsEntry {
222 metadata: Metadata {
223 inode,
224 mtime: SystemTime::now(),
225 is_dir: true,
226 is_symlink: false,
227 },
228 content: None,
229 },
230 );
231 state.emit_event(&[path]).await;
232 Ok(())
233 }
234
235 pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
236 let mut state = self.state.lock().await;
237 let path = path.as_ref();
238 state.validate_path(path)?;
239
240 let inode = state.next_inode;
241 state.next_inode += 1;
242 state.entries.insert(
243 path.to_path_buf(),
244 FakeFsEntry {
245 metadata: Metadata {
246 inode,
247 mtime: SystemTime::now(),
248 is_dir: false,
249 is_symlink: false,
250 },
251 content: Some(content),
252 },
253 );
254 state.emit_event(&[path]).await;
255 Ok(())
256 }
257
258 #[must_use]
259 pub fn insert_tree<'a>(
260 &'a self,
261 path: impl 'a + AsRef<Path> + Send,
262 tree: serde_json::Value,
263 ) -> futures::future::BoxFuture<'a, ()> {
264 use futures::FutureExt as _;
265 use serde_json::Value::*;
266
267 async move {
268 let path = path.as_ref();
269
270 match tree {
271 Object(map) => {
272 self.insert_dir(path).await.unwrap();
273 for (name, contents) in map {
274 let mut path = PathBuf::from(path);
275 path.push(name);
276 self.insert_tree(&path, contents).await;
277 }
278 }
279 Null => {
280 self.insert_dir(&path).await.unwrap();
281 }
282 String(contents) => {
283 self.insert_file(&path, contents).await.unwrap();
284 }
285 _ => {
286 panic!("JSON object must contain only objects, strings, or null");
287 }
288 }
289 }
290 .boxed()
291 }
292
293 pub async fn remove(&self, path: &Path) -> Result<()> {
294 let mut state = self.state.lock().await;
295 state.validate_path(path)?;
296 state.entries.retain(|path, _| !path.starts_with(path));
297 state.emit_event(&[path]).await;
298 Ok(())
299 }
300
301 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
302 let mut state = self.state.lock().await;
303 state.validate_path(source)?;
304 state.validate_path(target)?;
305 if state.entries.contains_key(target) {
306 Err(anyhow!("target path already exists"))
307 } else {
308 let mut removed = Vec::new();
309 state.entries.retain(|path, entry| {
310 if let Ok(relative_path) = path.strip_prefix(source) {
311 removed.push((relative_path.to_path_buf(), entry.clone()));
312 false
313 } else {
314 true
315 }
316 });
317
318 for (relative_path, entry) in removed {
319 let new_path = target.join(relative_path);
320 state.entries.insert(new_path, entry);
321 }
322
323 state.emit_event(&[source, target]).await;
324 Ok(())
325 }
326 }
327}
328
329#[cfg(any(test, feature = "test-support"))]
330#[async_trait::async_trait]
331impl Fs for FakeFs {
332 async fn load(&self, path: &Path) -> Result<String> {
333 let state = self.state.lock().await;
334 let text = state
335 .entries
336 .get(path)
337 .and_then(|e| e.content.as_ref())
338 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
339 Ok(text.clone())
340 }
341
342 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
343 let mut state = self.state.lock().await;
344 state.validate_path(path)?;
345 if let Some(entry) = state.entries.get_mut(path) {
346 if entry.metadata.is_dir {
347 Err(anyhow!("cannot overwrite a directory with a file"))
348 } else {
349 entry.content = Some(text.chunks().collect());
350 entry.metadata.mtime = SystemTime::now();
351 state.emit_event(&[path]).await;
352 Ok(())
353 }
354 } else {
355 let inode = state.next_inode;
356 state.next_inode += 1;
357 let entry = FakeFsEntry {
358 metadata: Metadata {
359 inode,
360 mtime: SystemTime::now(),
361 is_dir: false,
362 is_symlink: false,
363 },
364 content: Some(text.chunks().collect()),
365 };
366 state.entries.insert(path.to_path_buf(), entry);
367 state.emit_event(&[path]).await;
368 Ok(())
369 }
370 }
371
372 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
373 Ok(path.to_path_buf())
374 }
375
376 async fn is_file(&self, path: &Path) -> bool {
377 let state = self.state.lock().await;
378 state
379 .entries
380 .get(path)
381 .map_or(false, |entry| !entry.metadata.is_dir)
382 }
383
384 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
385 let state = self.state.lock().await;
386 Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
387 }
388
389 async fn read_dir(
390 &self,
391 abs_path: &Path,
392 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
393 use futures::{future, stream};
394 let state = self.state.lock().await;
395 let abs_path = abs_path.to_path_buf();
396 Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
397 move |(child_path, _)| {
398 future::ready(if child_path.parent() == Some(&abs_path) {
399 Some(Ok(child_path))
400 } else {
401 None
402 })
403 },
404 )))
405 }
406
407 async fn watch(
408 &self,
409 path: &Path,
410 _: Duration,
411 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
412 let state = self.state.lock().await;
413 let rx = state.events_tx.subscribe();
414 let path = path.to_path_buf();
415 Box::pin(futures::StreamExt::filter(rx, move |events| {
416 let result = events.iter().any(|event| event.path.starts_with(&path));
417 async move { result }
418 }))
419 }
420
421 fn is_fake(&self) -> bool {
422 true
423 }
424
425 #[cfg(any(test, feature = "test-support"))]
426 fn as_fake(&self) -> &FakeFs {
427 self
428 }
429}