1use super::editor::Rope;
2use anyhow::{anyhow, Result};
3use fsevent::EventStream;
4use futures::{Stream, StreamExt};
5use postage::prelude::Sink as _;
6use smol::io::{AsyncReadExt, AsyncWriteExt};
7use std::{
8 io,
9 os::unix::fs::MetadataExt,
10 path::{Path, PathBuf},
11 pin::Pin,
12 time::{Duration, SystemTime},
13};
14
15#[async_trait::async_trait]
16pub trait Fs: Send + Sync {
17 async fn load(&self, path: &Path) -> Result<String>;
18 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
19 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
20 async fn is_file(&self, path: &Path) -> bool;
21 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>>;
22 async fn read_dir(
23 &self,
24 path: &Path,
25 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>>;
26 async fn watch(
27 &self,
28 path: &Path,
29 latency: Duration,
30 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>>;
31 fn is_fake(&self) -> bool;
32}
33
34#[derive(Clone, Debug)]
35pub struct Metadata {
36 pub inode: u64,
37 pub mtime: SystemTime,
38 pub is_symlink: bool,
39 pub is_dir: bool,
40}
41
42pub struct RealFs;
43
44#[async_trait::async_trait]
45impl Fs for RealFs {
46 async fn load(&self, path: &Path) -> Result<String> {
47 let mut file = smol::fs::File::open(path).await?;
48 let mut text = String::new();
49 file.read_to_string(&mut text).await?;
50 Ok(text)
51 }
52
53 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
54 let buffer_size = text.summary().bytes.min(10 * 1024);
55 let file = smol::fs::File::create(path).await?;
56 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
57 for chunk in text.chunks() {
58 writer.write_all(chunk.as_bytes()).await?;
59 }
60 writer.flush().await?;
61 Ok(())
62 }
63
64 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
65 Ok(smol::fs::canonicalize(path).await?)
66 }
67
68 async fn is_file(&self, path: &Path) -> bool {
69 smol::fs::metadata(path)
70 .await
71 .map_or(false, |metadata| metadata.is_file())
72 }
73
74 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
75 let symlink_metadata = match smol::fs::symlink_metadata(path).await {
76 Ok(metadata) => metadata,
77 Err(err) => {
78 return match (err.kind(), err.raw_os_error()) {
79 (io::ErrorKind::NotFound, _) => Ok(None),
80 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
81 _ => Err(anyhow::Error::new(err)),
82 }
83 }
84 };
85
86 let is_symlink = symlink_metadata.file_type().is_symlink();
87 let metadata = if is_symlink {
88 smol::fs::metadata(path).await?
89 } else {
90 symlink_metadata
91 };
92 Ok(Some(Metadata {
93 inode: metadata.ino(),
94 mtime: metadata.modified().unwrap(),
95 is_symlink,
96 is_dir: metadata.file_type().is_dir(),
97 }))
98 }
99
100 async fn read_dir(
101 &self,
102 path: &Path,
103 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
104 let result = smol::fs::read_dir(path).await?.map(|entry| match entry {
105 Ok(entry) => Ok(entry.path()),
106 Err(error) => Err(anyhow!("failed to read dir entry {:?}", error)),
107 });
108 Ok(Box::pin(result))
109 }
110
111 async fn watch(
112 &self,
113 path: &Path,
114 latency: Duration,
115 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
116 let (mut tx, rx) = postage::mpsc::channel(64);
117 let (stream, handle) = EventStream::new(&[path], latency);
118 std::mem::forget(handle);
119 std::thread::spawn(move || {
120 stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
121 });
122 Box::pin(rx)
123 }
124
125 fn is_fake(&self) -> bool {
126 false
127 }
128}
129
130#[derive(Clone, Debug)]
131struct FakeFsEntry {
132 metadata: Metadata,
133 content: Option<String>,
134}
135
136#[cfg(any(test, feature = "test-support"))]
137struct FakeFsState {
138 entries: std::collections::BTreeMap<PathBuf, FakeFsEntry>,
139 next_inode: u64,
140 events_tx: postage::broadcast::Sender<Vec<fsevent::Event>>,
141}
142
143#[cfg(any(test, feature = "test-support"))]
144impl FakeFsState {
145 fn validate_path(&self, path: &Path) -> Result<()> {
146 if path.is_absolute()
147 && path
148 .parent()
149 .and_then(|path| self.entries.get(path))
150 .map_or(false, |e| e.metadata.is_dir)
151 {
152 Ok(())
153 } else {
154 Err(anyhow!("invalid path {:?}", path))
155 }
156 }
157
158 async fn emit_event(&mut self, paths: &[&Path]) {
159 let events = paths
160 .iter()
161 .map(|path| fsevent::Event {
162 event_id: 0,
163 flags: fsevent::StreamFlags::empty(),
164 path: path.to_path_buf(),
165 })
166 .collect();
167
168 let _ = self.events_tx.send(events).await;
169 }
170}
171
172#[cfg(any(test, feature = "test-support"))]
173pub struct FakeFs {
174 // Use an unfair lock to ensure tests are deterministic.
175 state: futures::lock::Mutex<FakeFsState>,
176}
177
178#[cfg(any(test, feature = "test-support"))]
179impl FakeFs {
180 pub fn new() -> Self {
181 let (events_tx, _) = postage::broadcast::channel(2048);
182 let mut entries = std::collections::BTreeMap::new();
183 entries.insert(
184 Path::new("/").to_path_buf(),
185 FakeFsEntry {
186 metadata: Metadata {
187 inode: 0,
188 mtime: SystemTime::now(),
189 is_dir: true,
190 is_symlink: false,
191 },
192 content: None,
193 },
194 );
195 Self {
196 state: futures::lock::Mutex::new(FakeFsState {
197 entries,
198 next_inode: 1,
199 events_tx,
200 }),
201 }
202 }
203
204 pub async fn insert_dir(&self, path: impl AsRef<Path>) -> Result<()> {
205 let mut state = self.state.lock().await;
206 let path = path.as_ref();
207 state.validate_path(path)?;
208
209 let inode = state.next_inode;
210 state.next_inode += 1;
211 state.entries.insert(
212 path.to_path_buf(),
213 FakeFsEntry {
214 metadata: Metadata {
215 inode,
216 mtime: SystemTime::now(),
217 is_dir: true,
218 is_symlink: false,
219 },
220 content: None,
221 },
222 );
223 state.emit_event(&[path]).await;
224 Ok(())
225 }
226
227 pub async fn insert_file(&self, path: impl AsRef<Path>, content: String) -> Result<()> {
228 let mut state = self.state.lock().await;
229 let path = path.as_ref();
230 state.validate_path(path)?;
231
232 let inode = state.next_inode;
233 state.next_inode += 1;
234 state.entries.insert(
235 path.to_path_buf(),
236 FakeFsEntry {
237 metadata: Metadata {
238 inode,
239 mtime: SystemTime::now(),
240 is_dir: false,
241 is_symlink: false,
242 },
243 content: Some(content),
244 },
245 );
246 state.emit_event(&[path]).await;
247 Ok(())
248 }
249
250 #[must_use]
251 pub fn insert_tree<'a>(
252 &'a self,
253 path: impl 'a + AsRef<Path> + Send,
254 tree: serde_json::Value,
255 ) -> futures::future::BoxFuture<'a, ()> {
256 use futures::FutureExt as _;
257 use serde_json::Value::*;
258
259 async move {
260 let path = path.as_ref();
261
262 match tree {
263 Object(map) => {
264 self.insert_dir(path).await.unwrap();
265 for (name, contents) in map {
266 let mut path = PathBuf::from(path);
267 path.push(name);
268 self.insert_tree(&path, contents).await;
269 }
270 }
271 Null => {
272 self.insert_dir(&path).await.unwrap();
273 }
274 String(contents) => {
275 self.insert_file(&path, contents).await.unwrap();
276 }
277 _ => {
278 panic!("JSON object must contain only objects, strings, or null");
279 }
280 }
281 }
282 .boxed()
283 }
284
285 pub async fn remove(&self, path: &Path) -> Result<()> {
286 let mut state = self.state.lock().await;
287 state.validate_path(path)?;
288 state.entries.retain(|path, _| !path.starts_with(path));
289 state.emit_event(&[path]).await;
290 Ok(())
291 }
292
293 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
294 let mut state = self.state.lock().await;
295 state.validate_path(source)?;
296 state.validate_path(target)?;
297 if state.entries.contains_key(target) {
298 Err(anyhow!("target path already exists"))
299 } else {
300 let mut removed = Vec::new();
301 state.entries.retain(|path, entry| {
302 if let Ok(relative_path) = path.strip_prefix(source) {
303 removed.push((relative_path.to_path_buf(), entry.clone()));
304 false
305 } else {
306 true
307 }
308 });
309
310 for (relative_path, entry) in removed {
311 let new_path = target.join(relative_path);
312 state.entries.insert(new_path, entry);
313 }
314
315 state.emit_event(&[source, target]).await;
316 Ok(())
317 }
318 }
319}
320
321#[cfg(any(test, feature = "test-support"))]
322#[async_trait::async_trait]
323impl Fs for FakeFs {
324 async fn load(&self, path: &Path) -> Result<String> {
325 let state = self.state.lock().await;
326 let text = state
327 .entries
328 .get(path)
329 .and_then(|e| e.content.as_ref())
330 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
331 Ok(text.clone())
332 }
333
334 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
335 let mut state = self.state.lock().await;
336 state.validate_path(path)?;
337 if let Some(entry) = state.entries.get_mut(path) {
338 if entry.metadata.is_dir {
339 Err(anyhow!("cannot overwrite a directory with a file"))
340 } else {
341 entry.content = Some(text.chunks().collect());
342 entry.metadata.mtime = SystemTime::now();
343 state.emit_event(&[path]).await;
344 Ok(())
345 }
346 } else {
347 let inode = state.next_inode;
348 state.next_inode += 1;
349 let entry = FakeFsEntry {
350 metadata: Metadata {
351 inode,
352 mtime: SystemTime::now(),
353 is_dir: false,
354 is_symlink: false,
355 },
356 content: Some(text.chunks().collect()),
357 };
358 state.entries.insert(path.to_path_buf(), entry);
359 state.emit_event(&[path]).await;
360 Ok(())
361 }
362 }
363
364 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
365 Ok(path.to_path_buf())
366 }
367
368 async fn is_file(&self, path: &Path) -> bool {
369 let state = self.state.lock().await;
370 state
371 .entries
372 .get(path)
373 .map_or(false, |entry| !entry.metadata.is_dir)
374 }
375
376 async fn metadata(&self, path: &Path) -> Result<Option<Metadata>> {
377 let state = self.state.lock().await;
378 Ok(state.entries.get(path).map(|entry| entry.metadata.clone()))
379 }
380
381 async fn read_dir(
382 &self,
383 abs_path: &Path,
384 ) -> Result<Pin<Box<dyn Send + Stream<Item = Result<PathBuf>>>>> {
385 use futures::{future, stream};
386 let state = self.state.lock().await;
387 let abs_path = abs_path.to_path_buf();
388 Ok(Box::pin(stream::iter(state.entries.clone()).filter_map(
389 move |(child_path, _)| {
390 future::ready(if child_path.parent() == Some(&abs_path) {
391 Some(Ok(child_path))
392 } else {
393 None
394 })
395 },
396 )))
397 }
398
399 async fn watch(
400 &self,
401 path: &Path,
402 _: Duration,
403 ) -> Pin<Box<dyn Send + Stream<Item = Vec<fsevent::Event>>>> {
404 let state = self.state.lock().await;
405 let rx = state.events_tx.subscribe();
406 let path = path.to_path_buf();
407 Box::pin(futures::StreamExt::filter(rx, move |events| {
408 let result = events.iter().any(|event| event.path.starts_with(&path));
409 async move { result }
410 }))
411 }
412
413 fn is_fake(&self) -> bool {
414 true
415 }
416}