1mod char_bag;
2mod fuzzy;
3mod ignore;
4
5use self::{char_bag::CharBag, ignore::IgnoreStack};
6use crate::{
7 editor::{self, Buffer, History, Operation, Rope},
8 language::LanguageRegistry,
9 rpc::{self, proto},
10 sum_tree::{self, Cursor, Edit, SumTree},
11 time::{self, ReplicaId},
12 util::Bias,
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Context, Result};
16use atomic::Ordering::SeqCst;
17use futures::{Stream, StreamExt};
18pub use fuzzy::{match_paths, PathMatch};
19use gpui::{
20 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
21 Task, WeakModelHandle,
22};
23use lazy_static::lazy_static;
24use parking_lot::Mutex;
25use postage::{
26 prelude::{Sink as _, Stream as _},
27 watch,
28};
29use smol::{
30 channel::{self, Sender},
31 io::{AsyncReadExt, AsyncWriteExt},
32};
33use std::{
34 cmp::{self, Ordering},
35 collections::HashMap,
36 convert::{TryFrom, TryInto},
37 ffi::{OsStr, OsString},
38 fmt,
39 future::Future,
40 io,
41 ops::Deref,
42 os::unix::fs::MetadataExt,
43 path::{Path, PathBuf},
44 pin::Pin,
45 sync::{
46 atomic::{self, AtomicUsize},
47 Arc,
48 },
49 time::{Duration, SystemTime},
50};
51use zed_rpc::{ForegroundRouter, PeerId, TypedEnvelope};
52
53lazy_static! {
54 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
55}
56
57pub fn init(cx: &mut MutableAppContext, rpc: &rpc::Client, router: &mut ForegroundRouter) {
58 rpc.on_message(router, remote::add_peer, cx);
59 rpc.on_message(router, remote::remove_peer, cx);
60 rpc.on_message(router, remote::update_worktree, cx);
61 rpc.on_message(router, remote::open_buffer, cx);
62 rpc.on_message(router, remote::close_buffer, cx);
63 rpc.on_message(router, remote::update_buffer, cx);
64 rpc.on_message(router, remote::buffer_saved, cx);
65 rpc.on_message(router, remote::save_buffer, cx);
66}
67
68#[async_trait::async_trait]
69pub trait Fs: Send + Sync {
70 async fn entry(
71 &self,
72 root_char_bag: CharBag,
73 next_entry_id: &AtomicUsize,
74 path: Arc<Path>,
75 abs_path: &Path,
76 ) -> Result<Option<Entry>>;
77 async fn child_entries<'a>(
78 &self,
79 root_char_bag: CharBag,
80 next_entry_id: &'a AtomicUsize,
81 path: &'a Path,
82 abs_path: &'a Path,
83 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
84 async fn load(&self, path: &Path) -> Result<String>;
85 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
86 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
87}
88
89struct ProductionFs;
90
91#[async_trait::async_trait]
92impl Fs for ProductionFs {
93 async fn entry(
94 &self,
95 root_char_bag: CharBag,
96 next_entry_id: &AtomicUsize,
97 path: Arc<Path>,
98 abs_path: &Path,
99 ) -> Result<Option<Entry>> {
100 let metadata = match smol::fs::metadata(&abs_path).await {
101 Err(err) => {
102 return match (err.kind(), err.raw_os_error()) {
103 (io::ErrorKind::NotFound, _) => Ok(None),
104 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
105 _ => Err(anyhow::Error::new(err)),
106 }
107 }
108 Ok(metadata) => metadata,
109 };
110 let inode = metadata.ino();
111 let mtime = metadata.modified()?;
112 let is_symlink = smol::fs::symlink_metadata(&abs_path)
113 .await
114 .context("failed to read symlink metadata")?
115 .file_type()
116 .is_symlink();
117
118 let entry = Entry {
119 id: next_entry_id.fetch_add(1, SeqCst),
120 kind: if metadata.file_type().is_dir() {
121 EntryKind::PendingDir
122 } else {
123 EntryKind::File(char_bag_for_path(root_char_bag, &path))
124 },
125 path: Arc::from(path),
126 inode,
127 mtime,
128 is_symlink,
129 is_ignored: false,
130 };
131
132 Ok(Some(entry))
133 }
134
135 async fn child_entries<'a>(
136 &self,
137 root_char_bag: CharBag,
138 next_entry_id: &'a AtomicUsize,
139 path: &'a Path,
140 abs_path: &'a Path,
141 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
142 let entries = smol::fs::read_dir(abs_path).await?;
143 Ok(entries
144 .then(move |entry| async move {
145 let child_entry = entry?;
146 let child_name = child_entry.file_name();
147 let child_path: Arc<Path> = path.join(&child_name).into();
148 let child_abs_path = abs_path.join(&child_name);
149 let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
150 let child_metadata = smol::fs::metadata(child_abs_path).await?;
151 let child_inode = child_metadata.ino();
152 let child_mtime = child_metadata.modified()?;
153 Ok(Entry {
154 id: next_entry_id.fetch_add(1, SeqCst),
155 kind: if child_metadata.file_type().is_dir() {
156 EntryKind::PendingDir
157 } else {
158 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
159 },
160 path: child_path,
161 inode: child_inode,
162 mtime: child_mtime,
163 is_symlink: child_is_symlink,
164 is_ignored: false,
165 })
166 })
167 .boxed())
168 }
169
170 async fn load(&self, path: &Path) -> Result<String> {
171 let mut file = smol::fs::File::open(path).await?;
172 let mut text = String::new();
173 file.read_to_string(&mut text).await?;
174 Ok(text)
175 }
176
177 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
178 let buffer_size = text.summary().bytes.min(10 * 1024);
179 let file = smol::fs::File::create(path).await?;
180 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
181 for chunk in text.chunks() {
182 writer.write_all(chunk.as_bytes()).await?;
183 }
184 writer.flush().await?;
185 Ok(())
186 }
187
188 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
189 Ok(smol::fs::canonicalize(path).await?)
190 }
191}
192
193#[derive(Clone)]
194struct InMemoryEntry {
195 inode: u64,
196 mtime: SystemTime,
197 is_dir: bool,
198 is_symlink: bool,
199 content: Option<String>,
200}
201
202#[cfg(any(test, feature = "test-support"))]
203struct InMemoryFsState {
204 entries: std::collections::BTreeMap<PathBuf, InMemoryEntry>,
205 next_inode: u64,
206 events_tx: postage::broadcast::Sender<fsevent::Event>,
207}
208
209#[cfg(any(test, feature = "test-support"))]
210impl InMemoryFsState {
211 fn validate_path(&self, path: &Path) -> Result<()> {
212 if path.is_absolute()
213 && path
214 .parent()
215 .and_then(|path| self.entries.get(path))
216 .map_or(false, |e| e.is_dir)
217 {
218 Ok(())
219 } else {
220 Err(anyhow!("invalid path {:?}", path))
221 }
222 }
223
224 async fn emit_event(&mut self, path: &Path) {
225 let _ = self
226 .events_tx
227 .send(fsevent::Event {
228 event_id: 0,
229 flags: fsevent::StreamFlags::empty(),
230 path: path.to_path_buf(),
231 })
232 .await;
233 }
234}
235
236#[cfg(any(test, feature = "test-support"))]
237pub struct InMemoryFs {
238 state: smol::lock::RwLock<InMemoryFsState>,
239}
240
241#[cfg(any(test, feature = "test-support"))]
242impl InMemoryFs {
243 pub fn new() -> Self {
244 let (events_tx, _) = postage::broadcast::channel(2048);
245 let mut entries = std::collections::BTreeMap::new();
246 entries.insert(
247 Path::new("/").to_path_buf(),
248 InMemoryEntry {
249 inode: 0,
250 mtime: SystemTime::now(),
251 is_dir: true,
252 is_symlink: false,
253 content: None,
254 },
255 );
256 Self {
257 state: smol::lock::RwLock::new(InMemoryFsState {
258 entries,
259 next_inode: 1,
260 events_tx,
261 }),
262 }
263 }
264
265 pub async fn insert_dir(&self, path: &Path) -> Result<()> {
266 let mut state = self.state.write().await;
267 state.validate_path(path)?;
268
269 let inode = state.next_inode;
270 state.next_inode += 1;
271 state.entries.insert(
272 path.to_path_buf(),
273 InMemoryEntry {
274 inode,
275 mtime: SystemTime::now(),
276 is_dir: true,
277 is_symlink: false,
278 content: None,
279 },
280 );
281 state.emit_event(path).await;
282 Ok(())
283 }
284
285 pub async fn remove(&self, path: &Path) -> Result<()> {
286 let mut state = self.state.write().await;
287 state.validate_path(path)?;
288 state.entries.retain(|path, _| !path.starts_with(path));
289 state.emit_event(&path).await;
290 Ok(())
291 }
292
293 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
294 let mut state = self.state.write().await;
295 state.validate_path(source)?;
296 state.validate_path(target)?;
297 if state.entries.contains_key(target) {
298 Err(anyhow!("target path already exists"))
299 } else {
300 let mut removed = Vec::new();
301 state.entries.retain(|path, entry| {
302 if let Ok(relative_path) = path.strip_prefix(source) {
303 removed.push((relative_path.to_path_buf(), entry.clone()));
304 false
305 } else {
306 true
307 }
308 });
309
310 for (relative_path, entry) in removed {
311 let new_path = target.join(relative_path);
312 state.entries.insert(new_path, entry);
313 }
314
315 Ok(())
316 }
317 }
318
319 pub async fn events(&self) -> postage::broadcast::Receiver<fsevent::Event> {
320 self.state.read().await.events_tx.subscribe()
321 }
322}
323
324#[cfg(any(test, feature = "test-support"))]
325#[async_trait::async_trait]
326impl Fs for InMemoryFs {
327 async fn entry(
328 &self,
329 root_char_bag: CharBag,
330 next_entry_id: &AtomicUsize,
331 path: Arc<Path>,
332 abs_path: &Path,
333 ) -> Result<Option<Entry>> {
334 let state = self.state.read().await;
335 if let Some(entry) = state.entries.get(abs_path) {
336 Ok(Some(Entry {
337 id: next_entry_id.fetch_add(1, SeqCst),
338 kind: if entry.is_dir {
339 EntryKind::PendingDir
340 } else {
341 EntryKind::File(char_bag_for_path(root_char_bag, &path))
342 },
343 path: Arc::from(path),
344 inode: entry.inode,
345 mtime: entry.mtime,
346 is_symlink: entry.is_symlink,
347 is_ignored: false,
348 }))
349 } else {
350 Ok(None)
351 }
352 }
353
354 async fn child_entries<'a>(
355 &self,
356 root_char_bag: CharBag,
357 next_entry_id: &'a AtomicUsize,
358 path: &'a Path,
359 abs_path: &'a Path,
360 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
361 use futures::{future, stream};
362
363 let state = self.state.read().await;
364 Ok(stream::iter(state.entries.clone())
365 .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
366 .then(move |(child_abs_path, child_entry)| async move {
367 smol::future::yield_now().await;
368 let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
369 Ok(Entry {
370 id: next_entry_id.fetch_add(1, SeqCst),
371 kind: if child_entry.is_dir {
372 EntryKind::PendingDir
373 } else {
374 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
375 },
376 path: child_path,
377 inode: child_entry.inode,
378 mtime: child_entry.mtime,
379 is_symlink: child_entry.is_symlink,
380 is_ignored: false,
381 })
382 })
383 .boxed())
384 }
385
386 async fn load(&self, path: &Path) -> Result<String> {
387 let state = self.state.read().await;
388 let text = state
389 .entries
390 .get(path)
391 .and_then(|e| e.content.as_ref())
392 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
393 Ok(text.clone())
394 }
395
396 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
397 let mut state = self.state.write().await;
398 state.validate_path(path)?;
399 if let Some(entry) = state.entries.get_mut(path) {
400 if entry.is_dir {
401 Err(anyhow!("cannot overwrite a directory with a file"))
402 } else {
403 entry.content = Some(text.chunks().collect());
404 entry.mtime = SystemTime::now();
405 state.emit_event(path).await;
406 Ok(())
407 }
408 } else {
409 let inode = state.next_inode;
410 state.next_inode += 1;
411 let entry = InMemoryEntry {
412 inode,
413 mtime: SystemTime::now(),
414 is_dir: false,
415 is_symlink: false,
416 content: Some(text.chunks().collect()),
417 };
418 state.entries.insert(path.to_path_buf(), entry);
419 state.emit_event(path).await;
420 Ok(())
421 }
422 }
423
424 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
425 Ok(path.to_path_buf())
426 }
427}
428
429#[derive(Clone, Debug)]
430enum ScanState {
431 Idle,
432 Scanning,
433 Err(Arc<anyhow::Error>),
434}
435
436pub enum Worktree {
437 Local(LocalWorktree),
438 Remote(RemoteWorktree),
439}
440
441impl Entity for Worktree {
442 type Event = ();
443
444 fn release(&mut self, cx: &mut MutableAppContext) {
445 let rpc = match self {
446 Self::Local(tree) => tree.rpc.clone(),
447 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
448 };
449
450 if let Some((rpc, worktree_id)) = rpc {
451 cx.spawn(|_| async move {
452 rpc.state
453 .write()
454 .await
455 .shared_worktrees
456 .remove(&worktree_id);
457 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
458 log::error!("error closing worktree {}: {}", worktree_id, err);
459 }
460 })
461 .detach();
462 }
463 }
464}
465
466impl Worktree {
467 pub fn local(
468 path: impl Into<Arc<Path>>,
469 languages: Arc<LanguageRegistry>,
470 cx: &mut ModelContext<Worktree>,
471 ) -> Self {
472 let fs = Arc::new(ProductionFs);
473 let (mut tree, scan_states_tx) =
474 LocalWorktree::new(path, languages, fs.clone(), Duration::from_millis(100), cx);
475 let (event_stream, event_stream_handle) = fsevent::EventStream::new(
476 &[tree.snapshot.abs_path.as_ref()],
477 Duration::from_millis(100),
478 );
479 let background_snapshot = tree.background_snapshot.clone();
480 std::thread::spawn(move || {
481 let scanner = BackgroundScanner::new(
482 background_snapshot,
483 scan_states_tx,
484 fs,
485 Arc::new(executor::Background::new()),
486 );
487 scanner.run(event_stream);
488 });
489 tree._event_stream_handle = Some(event_stream_handle);
490 Worktree::Local(tree)
491 }
492
493 #[cfg(any(test, feature = "test-support"))]
494 pub fn test(
495 path: impl Into<Arc<Path>>,
496 languages: Arc<LanguageRegistry>,
497 fs: Arc<InMemoryFs>,
498 cx: &mut ModelContext<Worktree>,
499 ) -> Self {
500 let (tree, scan_states_tx) =
501 LocalWorktree::new(path, languages, fs.clone(), Duration::ZERO, cx);
502 let background_snapshot = tree.background_snapshot.clone();
503 let fs = fs.clone();
504 let background = cx.background().clone();
505 cx.background()
506 .spawn(async move {
507 let events_rx = fs.events().await;
508 let scanner =
509 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
510 scanner.run_test(events_rx).await;
511 })
512 .detach();
513 Worktree::Local(tree)
514 }
515
516 pub async fn open_remote(
517 rpc: rpc::Client,
518 id: u64,
519 access_token: String,
520 languages: Arc<LanguageRegistry>,
521 cx: &mut AsyncAppContext,
522 ) -> Result<ModelHandle<Self>> {
523 let response = rpc
524 .request(proto::OpenWorktree {
525 worktree_id: id,
526 access_token,
527 })
528 .await?;
529
530 Worktree::remote(response, rpc, languages, cx).await
531 }
532
533 async fn remote(
534 open_response: proto::OpenWorktreeResponse,
535 rpc: rpc::Client,
536 languages: Arc<LanguageRegistry>,
537 cx: &mut AsyncAppContext,
538 ) -> Result<ModelHandle<Self>> {
539 let worktree = open_response
540 .worktree
541 .ok_or_else(|| anyhow!("empty worktree"))?;
542
543 let remote_id = open_response.worktree_id;
544 let replica_id = open_response.replica_id as ReplicaId;
545 let peers = open_response.peers;
546 let root_char_bag: CharBag = worktree
547 .root_name
548 .chars()
549 .map(|c| c.to_ascii_lowercase())
550 .collect();
551 let root_name = worktree.root_name.clone();
552 let (entries_by_path, entries_by_id) = cx
553 .background()
554 .spawn(async move {
555 let mut entries_by_path_edits = Vec::new();
556 let mut entries_by_id_edits = Vec::new();
557 for entry in worktree.entries {
558 match Entry::try_from((&root_char_bag, entry)) {
559 Ok(entry) => {
560 entries_by_id_edits.push(Edit::Insert(PathEntry {
561 id: entry.id,
562 path: entry.path.clone(),
563 scan_id: 0,
564 }));
565 entries_by_path_edits.push(Edit::Insert(entry));
566 }
567 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
568 }
569 }
570
571 let mut entries_by_path = SumTree::new();
572 let mut entries_by_id = SumTree::new();
573 entries_by_path.edit(entries_by_path_edits, &());
574 entries_by_id.edit(entries_by_id_edits, &());
575 (entries_by_path, entries_by_id)
576 })
577 .await;
578
579 let worktree = cx.update(|cx| {
580 cx.add_model(|cx: &mut ModelContext<Worktree>| {
581 let snapshot = Snapshot {
582 id: cx.model_id(),
583 scan_id: 0,
584 abs_path: Path::new("").into(),
585 root_name,
586 root_char_bag,
587 ignores: Default::default(),
588 entries_by_path,
589 entries_by_id,
590 removed_entry_ids: Default::default(),
591 next_entry_id: Default::default(),
592 };
593
594 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
595 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
596
597 cx.background()
598 .spawn(async move {
599 while let Some(update) = updates_rx.recv().await {
600 let mut snapshot = snapshot_tx.borrow().clone();
601 if let Err(error) = snapshot.apply_update(update) {
602 log::error!("error applying worktree update: {}", error);
603 }
604 *snapshot_tx.borrow_mut() = snapshot;
605 }
606 })
607 .detach();
608
609 {
610 let mut snapshot_rx = snapshot_rx.clone();
611 cx.spawn_weak(|this, mut cx| async move {
612 while let Some(_) = snapshot_rx.recv().await {
613 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
614 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
615 } else {
616 break;
617 }
618 }
619 })
620 .detach();
621 }
622
623 Worktree::Remote(RemoteWorktree {
624 remote_id,
625 replica_id,
626 snapshot,
627 snapshot_rx,
628 updates_tx,
629 rpc: rpc.clone(),
630 open_buffers: Default::default(),
631 peers: peers
632 .into_iter()
633 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
634 .collect(),
635 languages,
636 })
637 })
638 });
639 rpc.state
640 .write()
641 .await
642 .shared_worktrees
643 .insert(open_response.worktree_id, worktree.downgrade());
644
645 Ok(worktree)
646 }
647
648 pub fn as_local(&self) -> Option<&LocalWorktree> {
649 if let Worktree::Local(worktree) = self {
650 Some(worktree)
651 } else {
652 None
653 }
654 }
655
656 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
657 if let Worktree::Local(worktree) = self {
658 Some(worktree)
659 } else {
660 None
661 }
662 }
663
664 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
665 if let Worktree::Remote(worktree) = self {
666 Some(worktree)
667 } else {
668 None
669 }
670 }
671
672 pub fn snapshot(&self) -> Snapshot {
673 match self {
674 Worktree::Local(worktree) => worktree.snapshot(),
675 Worktree::Remote(worktree) => worktree.snapshot(),
676 }
677 }
678
679 pub fn replica_id(&self) -> ReplicaId {
680 match self {
681 Worktree::Local(_) => 0,
682 Worktree::Remote(worktree) => worktree.replica_id,
683 }
684 }
685
686 pub fn add_peer(
687 &mut self,
688 envelope: TypedEnvelope<proto::AddPeer>,
689 cx: &mut ModelContext<Worktree>,
690 ) -> Result<()> {
691 match self {
692 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
693 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
694 }
695 }
696
697 pub fn remove_peer(
698 &mut self,
699 envelope: TypedEnvelope<proto::RemovePeer>,
700 cx: &mut ModelContext<Worktree>,
701 ) -> Result<()> {
702 match self {
703 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
704 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
705 }
706 }
707
708 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
709 match self {
710 Worktree::Local(worktree) => &worktree.peers,
711 Worktree::Remote(worktree) => &worktree.peers,
712 }
713 }
714
715 pub fn open_buffer(
716 &mut self,
717 path: impl AsRef<Path>,
718 cx: &mut ModelContext<Self>,
719 ) -> Task<Result<ModelHandle<Buffer>>> {
720 match self {
721 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
722 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
723 }
724 }
725
726 #[cfg(feature = "test-support")]
727 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
728 let open_buffers = match self {
729 Worktree::Local(worktree) => &worktree.open_buffers,
730 Worktree::Remote(worktree) => &worktree.open_buffers,
731 };
732
733 let path = path.as_ref();
734 open_buffers
735 .values()
736 .find(|buffer| {
737 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
738 file.path.as_ref() == path
739 } else {
740 false
741 }
742 })
743 .is_some()
744 }
745
746 pub fn update_buffer(
747 &mut self,
748 envelope: proto::UpdateBuffer,
749 cx: &mut ModelContext<Self>,
750 ) -> Result<()> {
751 let open_buffers = match self {
752 Worktree::Local(worktree) => &worktree.open_buffers,
753 Worktree::Remote(worktree) => &worktree.open_buffers,
754 };
755 let buffer = open_buffers
756 .get(&(envelope.buffer_id as usize))
757 .and_then(|buf| buf.upgrade(&cx));
758
759 let buffer = if let Some(buffer) = buffer {
760 buffer
761 } else {
762 return if matches!(self, Worktree::Local(_)) {
763 Err(anyhow!(
764 "invalid buffer {} in update buffer message",
765 envelope.buffer_id
766 ))
767 } else {
768 Ok(())
769 };
770 };
771
772 let ops = envelope
773 .operations
774 .into_iter()
775 .map(|op| op.try_into())
776 .collect::<anyhow::Result<Vec<_>>>()?;
777 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
778 Ok(())
779 }
780
781 pub fn buffer_saved(
782 &mut self,
783 message: proto::BufferSaved,
784 cx: &mut ModelContext<Self>,
785 ) -> Result<()> {
786 if let Worktree::Remote(worktree) = self {
787 if let Some(buffer) = worktree
788 .open_buffers
789 .get(&(message.buffer_id as usize))
790 .and_then(|buf| buf.upgrade(&cx))
791 {
792 buffer.update(cx, |buffer, cx| {
793 let version = message.version.try_into()?;
794 let mtime = message
795 .mtime
796 .ok_or_else(|| anyhow!("missing mtime"))?
797 .into();
798 buffer.did_save(version, mtime, cx);
799 Result::<_, anyhow::Error>::Ok(())
800 })?;
801 }
802 Ok(())
803 } else {
804 Err(anyhow!(
805 "invalid buffer {} in buffer saved message",
806 message.buffer_id
807 ))
808 }
809 }
810
811 fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
812 let update_buffers = match self {
813 Self::Local(worktree) => {
814 let poll_interval = worktree.poll_interval;
815 worktree.snapshot = worktree.background_snapshot.lock().clone();
816 if worktree.is_scanning() {
817 if !worktree.poll_scheduled {
818 cx.spawn(|this, mut cx| async move {
819 if poll_interval.is_zero() {
820 smol::future::yield_now().await;
821 } else {
822 smol::Timer::after(poll_interval).await;
823 }
824 this.update(&mut cx, |this, cx| {
825 this.as_local_mut().unwrap().poll_scheduled = false;
826 this.poll_snapshot(cx);
827 })
828 })
829 .detach();
830 worktree.poll_scheduled = true;
831 }
832 false
833 } else {
834 true
835 }
836 }
837 Self::Remote(worktree) => {
838 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
839 true
840 }
841 };
842
843 if update_buffers {
844 let mut buffers_to_delete = Vec::new();
845 for (buffer_id, buffer) in self.open_buffers() {
846 if let Some(buffer) = buffer.upgrade(&cx) {
847 buffer.update(cx, |buffer, cx| {
848 let buffer_is_clean = !buffer.is_dirty();
849
850 if let Some(file) = buffer.file_mut() {
851 let mut file_changed = false;
852
853 if let Some(entry) = file
854 .entry_id
855 .and_then(|entry_id| self.entry_for_id(entry_id))
856 {
857 if entry.path != file.path {
858 file.path = entry.path.clone();
859 file_changed = true;
860 }
861
862 if entry.mtime != file.mtime {
863 file.mtime = entry.mtime;
864 file_changed = true;
865 if let Some(worktree) = self.as_local() {
866 if buffer_is_clean {
867 let abs_path = worktree.absolutize(&file.path);
868 refresh_buffer(abs_path, &worktree.fs, cx);
869 }
870 }
871 }
872 } else if let Some(entry) = self.entry_for_path(&file.path) {
873 file.entry_id = Some(entry.id);
874 file.mtime = entry.mtime;
875 if let Some(worktree) = self.as_local() {
876 if buffer_is_clean {
877 let abs_path = worktree.absolutize(&file.path);
878 refresh_buffer(abs_path, &worktree.fs, cx);
879 }
880 }
881 file_changed = true;
882 } else if !file.is_deleted() {
883 if buffer_is_clean {
884 cx.emit(editor::buffer::Event::Dirtied);
885 }
886 file.entry_id = None;
887 file_changed = true;
888 }
889
890 if file_changed {
891 cx.emit(editor::buffer::Event::FileHandleChanged);
892 }
893 }
894 });
895 } else {
896 buffers_to_delete.push(*buffer_id);
897 }
898 }
899
900 for buffer_id in buffers_to_delete {
901 self.open_buffers_mut().remove(&buffer_id);
902 }
903 }
904
905 cx.notify();
906 }
907
908 fn open_buffers(&self) -> &HashMap<usize, WeakModelHandle<Buffer>> {
909 match self {
910 Self::Local(worktree) => &worktree.open_buffers,
911 Self::Remote(worktree) => &worktree.open_buffers,
912 }
913 }
914
915 fn open_buffers_mut(&mut self) -> &mut HashMap<usize, WeakModelHandle<Buffer>> {
916 match self {
917 Self::Local(worktree) => &mut worktree.open_buffers,
918 Self::Remote(worktree) => &mut worktree.open_buffers,
919 }
920 }
921}
922
923impl Deref for Worktree {
924 type Target = Snapshot;
925
926 fn deref(&self) -> &Self::Target {
927 match self {
928 Worktree::Local(worktree) => &worktree.snapshot,
929 Worktree::Remote(worktree) => &worktree.snapshot,
930 }
931 }
932}
933
934pub struct LocalWorktree {
935 snapshot: Snapshot,
936 background_snapshot: Arc<Mutex<Snapshot>>,
937 snapshots_to_send_tx: Option<Sender<Snapshot>>,
938 last_scan_state_rx: watch::Receiver<ScanState>,
939 _event_stream_handle: Option<fsevent::Handle>,
940 poll_scheduled: bool,
941 rpc: Option<(rpc::Client, u64)>,
942 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
943 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
944 peers: HashMap<PeerId, ReplicaId>,
945 languages: Arc<LanguageRegistry>,
946 fs: Arc<dyn Fs>,
947 poll_interval: Duration,
948}
949
950impl LocalWorktree {
951 fn new(
952 path: impl Into<Arc<Path>>,
953 languages: Arc<LanguageRegistry>,
954 fs: Arc<dyn Fs>,
955 poll_interval: Duration,
956 cx: &mut ModelContext<Worktree>,
957 ) -> (Self, Sender<ScanState>) {
958 let abs_path = path.into();
959 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
960 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
961 let id = cx.model_id();
962 let snapshot = Snapshot {
963 id,
964 scan_id: 0,
965 abs_path,
966 root_name: Default::default(),
967 root_char_bag: Default::default(),
968 ignores: Default::default(),
969 entries_by_path: Default::default(),
970 entries_by_id: Default::default(),
971 removed_entry_ids: Default::default(),
972 next_entry_id: Default::default(),
973 };
974
975 let tree = Self {
976 snapshot: snapshot.clone(),
977 background_snapshot: Arc::new(Mutex::new(snapshot)),
978 snapshots_to_send_tx: None,
979 last_scan_state_rx,
980 _event_stream_handle: None,
981 poll_scheduled: false,
982 open_buffers: Default::default(),
983 shared_buffers: Default::default(),
984 peers: Default::default(),
985 rpc: None,
986 languages,
987 fs,
988 poll_interval,
989 };
990
991 cx.spawn_weak(|this, mut cx| async move {
992 while let Ok(scan_state) = scan_states_rx.recv().await {
993 if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
994 handle.update(&mut cx, |this, cx| {
995 last_scan_state_tx.blocking_send(scan_state).ok();
996 this.poll_snapshot(cx);
997 let tree = this.as_local_mut().unwrap();
998 if !tree.is_scanning() {
999 if let Some(snapshots_to_send_tx) = tree.snapshots_to_send_tx.clone() {
1000 if let Err(err) =
1001 smol::block_on(snapshots_to_send_tx.send(tree.snapshot()))
1002 {
1003 log::error!("error submitting snapshot to send {}", err);
1004 }
1005 }
1006 }
1007 });
1008 } else {
1009 break;
1010 }
1011 }
1012 })
1013 .detach();
1014
1015 (tree, scan_states_tx)
1016 }
1017
1018 pub fn open_buffer(
1019 &mut self,
1020 path: &Path,
1021 cx: &mut ModelContext<Worktree>,
1022 ) -> Task<Result<ModelHandle<Buffer>>> {
1023 let handle = cx.handle();
1024
1025 // If there is already a buffer for the given path, then return it.
1026 let mut existing_buffer = None;
1027 self.open_buffers.retain(|_buffer_id, buffer| {
1028 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1029 if let Some(file) = buffer.read(cx.as_ref()).file() {
1030 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1031 existing_buffer = Some(buffer);
1032 }
1033 }
1034 true
1035 } else {
1036 false
1037 }
1038 });
1039
1040 let languages = self.languages.clone();
1041 let path = Arc::from(path);
1042 cx.spawn(|this, mut cx| async move {
1043 if let Some(existing_buffer) = existing_buffer {
1044 Ok(existing_buffer)
1045 } else {
1046 let (file, contents) = this
1047 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
1048 .await?;
1049 let language = languages.select_language(&path).cloned();
1050 let buffer = cx.add_model(|cx| {
1051 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
1052 });
1053 this.update(&mut cx, |this, _| {
1054 let this = this
1055 .as_local_mut()
1056 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1057 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1058 Ok(buffer)
1059 })
1060 }
1061 })
1062 }
1063
1064 pub fn open_remote_buffer(
1065 &mut self,
1066 envelope: TypedEnvelope<proto::OpenBuffer>,
1067 cx: &mut ModelContext<Worktree>,
1068 ) -> Task<Result<proto::OpenBufferResponse>> {
1069 let peer_id = envelope.original_sender_id();
1070 let path = Path::new(&envelope.payload.path);
1071
1072 let buffer = self.open_buffer(path, cx);
1073
1074 cx.spawn(|this, mut cx| async move {
1075 let buffer = buffer.await?;
1076 this.update(&mut cx, |this, cx| {
1077 this.as_local_mut()
1078 .unwrap()
1079 .shared_buffers
1080 .entry(peer_id?)
1081 .or_default()
1082 .insert(buffer.id() as u64, buffer.clone());
1083
1084 Ok(proto::OpenBufferResponse {
1085 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
1086 })
1087 })
1088 })
1089 }
1090
1091 pub fn close_remote_buffer(
1092 &mut self,
1093 envelope: TypedEnvelope<proto::CloseBuffer>,
1094 _: &mut ModelContext<Worktree>,
1095 ) -> Result<()> {
1096 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1097 shared_buffers.remove(&envelope.payload.buffer_id);
1098 }
1099
1100 Ok(())
1101 }
1102
1103 pub fn add_peer(
1104 &mut self,
1105 envelope: TypedEnvelope<proto::AddPeer>,
1106 cx: &mut ModelContext<Worktree>,
1107 ) -> Result<()> {
1108 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
1109 self.peers
1110 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1111 cx.notify();
1112 Ok(())
1113 }
1114
1115 pub fn remove_peer(
1116 &mut self,
1117 envelope: TypedEnvelope<proto::RemovePeer>,
1118 cx: &mut ModelContext<Worktree>,
1119 ) -> Result<()> {
1120 let peer_id = PeerId(envelope.payload.peer_id);
1121 let replica_id = self
1122 .peers
1123 .remove(&peer_id)
1124 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1125 self.shared_buffers.remove(&peer_id);
1126 for (_, buffer) in &self.open_buffers {
1127 if let Some(buffer) = buffer.upgrade(&cx) {
1128 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1129 }
1130 }
1131 cx.notify();
1132 Ok(())
1133 }
1134
1135 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1136 let mut scan_state_rx = self.last_scan_state_rx.clone();
1137 async move {
1138 let mut scan_state = Some(scan_state_rx.borrow().clone());
1139 while let Some(ScanState::Scanning) = scan_state {
1140 scan_state = scan_state_rx.recv().await;
1141 }
1142 }
1143 }
1144
1145 fn is_scanning(&self) -> bool {
1146 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1147 true
1148 } else {
1149 false
1150 }
1151 }
1152
1153 pub fn snapshot(&self) -> Snapshot {
1154 self.snapshot.clone()
1155 }
1156
1157 pub fn abs_path(&self) -> &Path {
1158 self.snapshot.abs_path.as_ref()
1159 }
1160
1161 pub fn contains_abs_path(&self, path: &Path) -> bool {
1162 path.starts_with(&self.snapshot.abs_path)
1163 }
1164
1165 fn absolutize(&self, path: &Path) -> PathBuf {
1166 if path.file_name().is_some() {
1167 self.snapshot.abs_path.join(path)
1168 } else {
1169 self.snapshot.abs_path.to_path_buf()
1170 }
1171 }
1172
1173 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1174 let handle = cx.handle();
1175 let path = Arc::from(path);
1176 let abs_path = self.absolutize(&path);
1177 let background_snapshot = self.background_snapshot.clone();
1178 let fs = self.fs.clone();
1179 cx.spawn(|this, mut cx| async move {
1180 let text = fs.load(&abs_path).await?;
1181 // Eagerly populate the snapshot with an updated entry for the loaded file
1182 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1183 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1184 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
1185 })
1186 }
1187
1188 pub fn save_buffer_as(
1189 &self,
1190 buffer: ModelHandle<Buffer>,
1191 path: impl Into<Arc<Path>>,
1192 text: Rope,
1193 cx: &mut ModelContext<Worktree>,
1194 ) -> Task<Result<File>> {
1195 let save = self.save(path, text, cx);
1196 cx.spawn(|this, mut cx| async move {
1197 let entry = save.await?;
1198 this.update(&mut cx, |this, cx| {
1199 this.as_local_mut()
1200 .unwrap()
1201 .open_buffers
1202 .insert(buffer.id(), buffer.downgrade());
1203 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1204 })
1205 })
1206 }
1207
1208 fn save(
1209 &self,
1210 path: impl Into<Arc<Path>>,
1211 text: Rope,
1212 cx: &mut ModelContext<Worktree>,
1213 ) -> Task<Result<Entry>> {
1214 let path = path.into();
1215 let abs_path = self.absolutize(&path);
1216 let background_snapshot = self.background_snapshot.clone();
1217 let fs = self.fs.clone();
1218 let save = cx.background().spawn(async move {
1219 fs.save(&abs_path, &text).await?;
1220 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1221 });
1222
1223 cx.spawn(|this, mut cx| async move {
1224 let entry = save.await?;
1225 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1226 Ok(entry)
1227 })
1228 }
1229
1230 pub fn share(
1231 &mut self,
1232 rpc: rpc::Client,
1233 cx: &mut ModelContext<Worktree>,
1234 ) -> Task<anyhow::Result<(u64, String)>> {
1235 let snapshot = self.snapshot();
1236 let share_request = self.share_request(cx);
1237 let handle = cx.handle();
1238 cx.spawn(|this, mut cx| async move {
1239 let share_request = share_request.await;
1240 let share_response = rpc.request(share_request).await?;
1241
1242 rpc.state
1243 .write()
1244 .await
1245 .shared_worktrees
1246 .insert(share_response.worktree_id, handle.downgrade());
1247
1248 log::info!("sharing worktree {:?}", share_response);
1249 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1250 smol::channel::unbounded::<Snapshot>();
1251
1252 cx.background()
1253 .spawn({
1254 let rpc = rpc.clone();
1255 let worktree_id = share_response.worktree_id;
1256 async move {
1257 let mut prev_snapshot = snapshot;
1258 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1259 let message = snapshot.build_update(&prev_snapshot, worktree_id);
1260 match rpc.send(message).await {
1261 Ok(()) => prev_snapshot = snapshot,
1262 Err(err) => log::error!("error sending snapshot diff {}", err),
1263 }
1264 }
1265 }
1266 })
1267 .detach();
1268
1269 this.update(&mut cx, |worktree, _| {
1270 let worktree = worktree.as_local_mut().unwrap();
1271 worktree.rpc = Some((rpc, share_response.worktree_id));
1272 worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
1273 });
1274
1275 Ok((share_response.worktree_id, share_response.access_token))
1276 })
1277 }
1278
1279 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
1280 let snapshot = self.snapshot();
1281 let root_name = self.root_name.clone();
1282 cx.background().spawn(async move {
1283 let entries = snapshot
1284 .entries_by_path
1285 .cursor::<(), ()>()
1286 .map(Into::into)
1287 .collect();
1288 proto::ShareWorktree {
1289 worktree: Some(proto::Worktree { root_name, entries }),
1290 }
1291 })
1292 }
1293}
1294
1295pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1296 let fs = fs.clone();
1297 cx.spawn(|buffer, mut cx| async move {
1298 let new_text = fs.load(&abs_path).await;
1299 match new_text {
1300 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1301 Ok(new_text) => {
1302 buffer
1303 .update(&mut cx, |buffer, cx| {
1304 buffer.set_text_from_disk(new_text.into(), cx)
1305 })
1306 .await;
1307 }
1308 }
1309 })
1310 .detach()
1311}
1312
1313impl Deref for LocalWorktree {
1314 type Target = Snapshot;
1315
1316 fn deref(&self) -> &Self::Target {
1317 &self.snapshot
1318 }
1319}
1320
1321impl fmt::Debug for LocalWorktree {
1322 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1323 self.snapshot.fmt(f)
1324 }
1325}
1326
1327pub struct RemoteWorktree {
1328 remote_id: u64,
1329 snapshot: Snapshot,
1330 snapshot_rx: watch::Receiver<Snapshot>,
1331 rpc: rpc::Client,
1332 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1333 replica_id: ReplicaId,
1334 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
1335 peers: HashMap<PeerId, ReplicaId>,
1336 languages: Arc<LanguageRegistry>,
1337}
1338
1339impl RemoteWorktree {
1340 pub fn open_buffer(
1341 &mut self,
1342 path: &Path,
1343 cx: &mut ModelContext<Worktree>,
1344 ) -> Task<Result<ModelHandle<Buffer>>> {
1345 let handle = cx.handle();
1346 let mut existing_buffer = None;
1347 self.open_buffers.retain(|_buffer_id, buffer| {
1348 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1349 if let Some(file) = buffer.read(cx.as_ref()).file() {
1350 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1351 existing_buffer = Some(buffer);
1352 }
1353 }
1354 true
1355 } else {
1356 false
1357 }
1358 });
1359
1360 let rpc = self.rpc.clone();
1361 let languages = self.languages.clone();
1362 let replica_id = self.replica_id;
1363 let remote_worktree_id = self.remote_id;
1364 let path = path.to_string_lossy().to_string();
1365 cx.spawn(|this, mut cx| async move {
1366 if let Some(existing_buffer) = existing_buffer {
1367 Ok(existing_buffer)
1368 } else {
1369 let entry = this
1370 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1371 .ok_or_else(|| anyhow!("file does not exist"))?;
1372 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1373 let language = languages.select_language(&path).cloned();
1374 let response = rpc
1375 .request(proto::OpenBuffer {
1376 worktree_id: remote_worktree_id as u64,
1377 path,
1378 })
1379 .await?;
1380 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1381 let buffer_id = remote_buffer.id;
1382 let buffer = cx.add_model(|cx| {
1383 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1384 });
1385 this.update(&mut cx, |this, _| {
1386 let this = this.as_remote_mut().unwrap();
1387 this.open_buffers
1388 .insert(buffer_id as usize, buffer.downgrade());
1389 });
1390 Ok(buffer)
1391 }
1392 })
1393 }
1394
1395 fn snapshot(&self) -> Snapshot {
1396 self.snapshot.clone()
1397 }
1398
1399 pub fn add_peer(
1400 &mut self,
1401 envelope: TypedEnvelope<proto::AddPeer>,
1402 cx: &mut ModelContext<Worktree>,
1403 ) -> Result<()> {
1404 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
1405 self.peers
1406 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1407 cx.notify();
1408 Ok(())
1409 }
1410
1411 pub fn remove_peer(
1412 &mut self,
1413 envelope: TypedEnvelope<proto::RemovePeer>,
1414 cx: &mut ModelContext<Worktree>,
1415 ) -> Result<()> {
1416 let peer_id = PeerId(envelope.payload.peer_id);
1417 let replica_id = self
1418 .peers
1419 .remove(&peer_id)
1420 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1421 for (_, buffer) in &self.open_buffers {
1422 if let Some(buffer) = buffer.upgrade(&cx) {
1423 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1424 }
1425 }
1426 cx.notify();
1427 Ok(())
1428 }
1429}
1430
1431#[derive(Clone)]
1432pub struct Snapshot {
1433 id: usize,
1434 scan_id: usize,
1435 abs_path: Arc<Path>,
1436 root_name: String,
1437 root_char_bag: CharBag,
1438 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1439 entries_by_path: SumTree<Entry>,
1440 entries_by_id: SumTree<PathEntry>,
1441 removed_entry_ids: HashMap<u64, usize>,
1442 next_entry_id: Arc<AtomicUsize>,
1443}
1444
1445impl Snapshot {
1446 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1447 let mut updated_entries = Vec::new();
1448 let mut removed_entries = Vec::new();
1449 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1450 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1451 loop {
1452 match (self_entries.peek(), other_entries.peek()) {
1453 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1454 Ordering::Less => {
1455 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1456 updated_entries.push(entry);
1457 self_entries.next();
1458 }
1459 Ordering::Equal => {
1460 if self_entry.scan_id != other_entry.scan_id {
1461 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1462 updated_entries.push(entry);
1463 }
1464
1465 self_entries.next();
1466 other_entries.next();
1467 }
1468 Ordering::Greater => {
1469 removed_entries.push(other_entry.id as u64);
1470 other_entries.next();
1471 }
1472 },
1473 (Some(self_entry), None) => {
1474 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1475 updated_entries.push(entry);
1476 self_entries.next();
1477 }
1478 (None, Some(other_entry)) => {
1479 removed_entries.push(other_entry.id as u64);
1480 other_entries.next();
1481 }
1482 (None, None) => break,
1483 }
1484 }
1485
1486 proto::UpdateWorktree {
1487 updated_entries,
1488 removed_entries,
1489 worktree_id,
1490 }
1491 }
1492
1493 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1494 self.scan_id += 1;
1495 let scan_id = self.scan_id;
1496
1497 let mut entries_by_path_edits = Vec::new();
1498 let mut entries_by_id_edits = Vec::new();
1499 for entry_id in update.removed_entries {
1500 let entry_id = entry_id as usize;
1501 let entry = self
1502 .entry_for_id(entry_id)
1503 .ok_or_else(|| anyhow!("unknown entry"))?;
1504 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1505 entries_by_id_edits.push(Edit::Remove(entry.id));
1506 }
1507
1508 for entry in update.updated_entries {
1509 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1510 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1511 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1512 }
1513 entries_by_id_edits.push(Edit::Insert(PathEntry {
1514 id: entry.id,
1515 path: entry.path.clone(),
1516 scan_id,
1517 }));
1518 entries_by_path_edits.push(Edit::Insert(entry));
1519 }
1520
1521 self.entries_by_path.edit(entries_by_path_edits, &());
1522 self.entries_by_id.edit(entries_by_id_edits, &());
1523
1524 Ok(())
1525 }
1526
1527 pub fn file_count(&self) -> usize {
1528 self.entries_by_path.summary().file_count
1529 }
1530
1531 pub fn visible_file_count(&self) -> usize {
1532 self.entries_by_path.summary().visible_file_count
1533 }
1534
1535 pub fn files(&self, start: usize) -> FileIter {
1536 FileIter::all(self, start)
1537 }
1538
1539 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1540 let empty_path = Path::new("");
1541 self.entries_by_path
1542 .cursor::<(), ()>()
1543 .filter(move |entry| entry.path.as_ref() != empty_path)
1544 .map(|entry| entry.path())
1545 }
1546
1547 pub fn visible_files(&self, start: usize) -> FileIter {
1548 FileIter::visible(self, start)
1549 }
1550
1551 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1552 ChildEntriesIter::new(path, self)
1553 }
1554
1555 pub fn root_entry(&self) -> &Entry {
1556 self.entry_for_path("").unwrap()
1557 }
1558
1559 /// Returns the filename of the snapshot's root, plus a trailing slash if the snapshot's root is
1560 /// a directory.
1561 pub fn root_name(&self) -> &str {
1562 &self.root_name
1563 }
1564
1565 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1566 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1567 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1568 cursor.item()
1569 } else {
1570 None
1571 }
1572 }
1573
1574 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1575 let entry = self.entries_by_id.get(&id, &())?;
1576 self.entry_for_path(&entry.path)
1577 }
1578
1579 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1580 self.entry_for_path(path.as_ref()).map(|e| e.inode())
1581 }
1582
1583 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1584 if !entry.is_dir() && entry.path().file_name() == Some(&GITIGNORE) {
1585 let (ignore, err) = Gitignore::new(self.abs_path.join(entry.path()));
1586 if let Some(err) = err {
1587 log::error!("error in ignore file {:?} - {:?}", entry.path(), err);
1588 }
1589
1590 let ignore_dir_path = entry.path().parent().unwrap();
1591 self.ignores
1592 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1593 }
1594
1595 self.reuse_entry_id(&mut entry);
1596 self.entries_by_path.insert_or_replace(entry.clone(), &());
1597 self.entries_by_id.insert_or_replace(
1598 PathEntry {
1599 id: entry.id,
1600 path: entry.path.clone(),
1601 scan_id: self.scan_id,
1602 },
1603 &(),
1604 );
1605 entry
1606 }
1607
1608 fn populate_dir(
1609 &mut self,
1610 parent_path: Arc<Path>,
1611 entries: impl IntoIterator<Item = Entry>,
1612 ignore: Option<Arc<Gitignore>>,
1613 ) {
1614 let mut parent_entry = self
1615 .entries_by_path
1616 .get(&PathKey(parent_path.clone()), &())
1617 .unwrap()
1618 .clone();
1619 if let Some(ignore) = ignore {
1620 self.ignores.insert(parent_path, (ignore, self.scan_id));
1621 }
1622 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1623 parent_entry.kind = EntryKind::Dir;
1624 } else {
1625 unreachable!();
1626 }
1627
1628 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1629 let mut entries_by_id_edits = Vec::new();
1630
1631 for mut entry in entries {
1632 self.reuse_entry_id(&mut entry);
1633 entries_by_id_edits.push(Edit::Insert(PathEntry {
1634 id: entry.id,
1635 path: entry.path.clone(),
1636 scan_id: self.scan_id,
1637 }));
1638 entries_by_path_edits.push(Edit::Insert(entry));
1639 }
1640
1641 self.entries_by_path.edit(entries_by_path_edits, &());
1642 self.entries_by_id.edit(entries_by_id_edits, &());
1643 }
1644
1645 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1646 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1647 entry.id = removed_entry_id;
1648 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1649 entry.id = existing_entry.id;
1650 }
1651 }
1652
1653 fn remove_path(&mut self, path: &Path) {
1654 let mut new_entries;
1655 let removed_entry_ids;
1656 {
1657 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1658 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1659 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1660 new_entries.push_tree(cursor.suffix(&()), &());
1661 }
1662 self.entries_by_path = new_entries;
1663
1664 let mut entries_by_id_edits = Vec::new();
1665 for entry in removed_entry_ids.cursor::<(), ()>() {
1666 let removed_entry_id = self
1667 .removed_entry_ids
1668 .entry(entry.inode)
1669 .or_insert(entry.id);
1670 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1671 entries_by_id_edits.push(Edit::Remove(entry.id));
1672 }
1673 self.entries_by_id.edit(entries_by_id_edits, &());
1674
1675 if path.file_name() == Some(&GITIGNORE) {
1676 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1677 *scan_id = self.scan_id;
1678 }
1679 }
1680 }
1681
1682 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1683 let mut new_ignores = Vec::new();
1684 for ancestor in path.ancestors().skip(1) {
1685 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1686 new_ignores.push((ancestor, Some(ignore.clone())));
1687 } else {
1688 new_ignores.push((ancestor, None));
1689 }
1690 }
1691
1692 let mut ignore_stack = IgnoreStack::none();
1693 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1694 if ignore_stack.is_path_ignored(&parent_path, true) {
1695 ignore_stack = IgnoreStack::all();
1696 break;
1697 } else if let Some(ignore) = ignore {
1698 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1699 }
1700 }
1701
1702 if ignore_stack.is_path_ignored(path, is_dir) {
1703 ignore_stack = IgnoreStack::all();
1704 }
1705
1706 ignore_stack
1707 }
1708}
1709
1710impl fmt::Debug for Snapshot {
1711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1712 for entry in self.entries_by_path.cursor::<(), ()>() {
1713 for _ in entry.path().ancestors().skip(1) {
1714 write!(f, " ")?;
1715 }
1716 writeln!(f, "{:?} (inode: {})", entry.path(), entry.inode())?;
1717 }
1718 Ok(())
1719 }
1720}
1721
1722#[derive(Clone, PartialEq)]
1723pub struct File {
1724 entry_id: Option<usize>,
1725 worktree: ModelHandle<Worktree>,
1726 pub path: Arc<Path>,
1727 pub mtime: SystemTime,
1728}
1729
1730impl File {
1731 pub fn new(
1732 entry_id: usize,
1733 worktree: ModelHandle<Worktree>,
1734 path: Arc<Path>,
1735 mtime: SystemTime,
1736 ) -> Self {
1737 Self {
1738 entry_id: Some(entry_id),
1739 worktree,
1740 path,
1741 mtime,
1742 }
1743 }
1744
1745 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1746 self.worktree.update(cx, |worktree, cx| {
1747 if let Some((rpc, remote_id)) = match worktree {
1748 Worktree::Local(worktree) => worktree.rpc.clone(),
1749 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1750 } {
1751 cx.spawn(|_, _| async move {
1752 if let Err(error) = rpc
1753 .send(proto::UpdateBuffer {
1754 worktree_id: remote_id,
1755 buffer_id,
1756 operations: Some(operation).iter().map(Into::into).collect(),
1757 })
1758 .await
1759 {
1760 log::error!("error sending buffer operation: {}", error);
1761 }
1762 })
1763 .detach();
1764 }
1765 });
1766 }
1767
1768 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1769 self.worktree.update(cx, |worktree, cx| {
1770 if let Worktree::Remote(worktree) = worktree {
1771 let worktree_id = worktree.remote_id;
1772 let rpc = worktree.rpc.clone();
1773 cx.background()
1774 .spawn(async move {
1775 if let Err(error) = rpc
1776 .send(proto::CloseBuffer {
1777 worktree_id,
1778 buffer_id,
1779 })
1780 .await
1781 {
1782 log::error!("error closing remote buffer: {}", error);
1783 };
1784 })
1785 .detach();
1786 }
1787 });
1788 }
1789
1790 /// Returns this file's path relative to the root of its worktree.
1791 pub fn path(&self) -> Arc<Path> {
1792 self.path.clone()
1793 }
1794
1795 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1796 self.worktree.read(cx).abs_path.join(&self.path)
1797 }
1798
1799 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1800 /// of its worktree, then this method will return the name of the worktree itself.
1801 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1802 self.path
1803 .file_name()
1804 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1805 .map(Into::into)
1806 }
1807
1808 pub fn is_deleted(&self) -> bool {
1809 self.entry_id.is_none()
1810 }
1811
1812 pub fn exists(&self) -> bool {
1813 !self.is_deleted()
1814 }
1815
1816 pub fn save(
1817 &self,
1818 buffer_id: u64,
1819 text: Rope,
1820 version: time::Global,
1821 cx: &mut MutableAppContext,
1822 ) -> Task<Result<(time::Global, SystemTime)>> {
1823 self.worktree.update(cx, |worktree, cx| match worktree {
1824 Worktree::Local(worktree) => {
1825 let rpc = worktree.rpc.clone();
1826 let save = worktree.save(self.path.clone(), text, cx);
1827 cx.spawn(|_, _| async move {
1828 let entry = save.await?;
1829 if let Some((rpc, worktree_id)) = rpc {
1830 rpc.send(proto::BufferSaved {
1831 worktree_id,
1832 buffer_id,
1833 version: (&version).into(),
1834 mtime: Some(entry.mtime.into()),
1835 })
1836 .await?;
1837 }
1838 Ok((version, entry.mtime))
1839 })
1840 }
1841 Worktree::Remote(worktree) => {
1842 let rpc = worktree.rpc.clone();
1843 let worktree_id = worktree.remote_id;
1844 cx.spawn(|_, _| async move {
1845 let response = rpc
1846 .request(proto::SaveBuffer {
1847 worktree_id,
1848 buffer_id,
1849 })
1850 .await?;
1851 let version = response.version.try_into()?;
1852 let mtime = response
1853 .mtime
1854 .ok_or_else(|| anyhow!("missing mtime"))?
1855 .into();
1856 Ok((version, mtime))
1857 })
1858 }
1859 })
1860 }
1861
1862 pub fn worktree_id(&self) -> usize {
1863 self.worktree.id()
1864 }
1865
1866 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1867 (self.worktree.id(), self.path())
1868 }
1869}
1870
1871#[derive(Clone, Debug)]
1872pub struct Entry {
1873 id: usize,
1874 kind: EntryKind,
1875 path: Arc<Path>,
1876 inode: u64,
1877 mtime: SystemTime,
1878 is_symlink: bool,
1879 is_ignored: bool,
1880}
1881
1882#[derive(Clone, Debug)]
1883pub enum EntryKind {
1884 PendingDir,
1885 Dir,
1886 File(CharBag),
1887}
1888
1889impl Entry {
1890 pub fn path(&self) -> &Arc<Path> {
1891 &self.path
1892 }
1893
1894 pub fn inode(&self) -> u64 {
1895 self.inode
1896 }
1897
1898 pub fn is_ignored(&self) -> bool {
1899 self.is_ignored
1900 }
1901
1902 fn is_dir(&self) -> bool {
1903 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1904 }
1905
1906 fn is_file(&self) -> bool {
1907 matches!(self.kind, EntryKind::File(_))
1908 }
1909}
1910
1911impl sum_tree::Item for Entry {
1912 type Summary = EntrySummary;
1913
1914 fn summary(&self) -> Self::Summary {
1915 let file_count;
1916 let visible_file_count;
1917 if self.is_file() {
1918 file_count = 1;
1919 if self.is_ignored {
1920 visible_file_count = 0;
1921 } else {
1922 visible_file_count = 1;
1923 }
1924 } else {
1925 file_count = 0;
1926 visible_file_count = 0;
1927 }
1928
1929 EntrySummary {
1930 max_path: self.path().clone(),
1931 file_count,
1932 visible_file_count,
1933 }
1934 }
1935}
1936
1937impl sum_tree::KeyedItem for Entry {
1938 type Key = PathKey;
1939
1940 fn key(&self) -> Self::Key {
1941 PathKey(self.path().clone())
1942 }
1943}
1944
1945#[derive(Clone, Debug)]
1946pub struct EntrySummary {
1947 max_path: Arc<Path>,
1948 file_count: usize,
1949 visible_file_count: usize,
1950}
1951
1952impl Default for EntrySummary {
1953 fn default() -> Self {
1954 Self {
1955 max_path: Arc::from(Path::new("")),
1956 file_count: 0,
1957 visible_file_count: 0,
1958 }
1959 }
1960}
1961
1962impl sum_tree::Summary for EntrySummary {
1963 type Context = ();
1964
1965 fn add_summary(&mut self, rhs: &Self, _: &()) {
1966 self.max_path = rhs.max_path.clone();
1967 self.file_count += rhs.file_count;
1968 self.visible_file_count += rhs.visible_file_count;
1969 }
1970}
1971
1972#[derive(Clone, Debug)]
1973struct PathEntry {
1974 id: usize,
1975 path: Arc<Path>,
1976 scan_id: usize,
1977}
1978
1979impl sum_tree::Item for PathEntry {
1980 type Summary = PathEntrySummary;
1981
1982 fn summary(&self) -> Self::Summary {
1983 PathEntrySummary { max_id: self.id }
1984 }
1985}
1986
1987impl sum_tree::KeyedItem for PathEntry {
1988 type Key = usize;
1989
1990 fn key(&self) -> Self::Key {
1991 self.id
1992 }
1993}
1994
1995#[derive(Clone, Debug, Default)]
1996struct PathEntrySummary {
1997 max_id: usize,
1998}
1999
2000impl sum_tree::Summary for PathEntrySummary {
2001 type Context = ();
2002
2003 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2004 self.max_id = summary.max_id;
2005 }
2006}
2007
2008impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2009 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2010 *self = summary.max_id;
2011 }
2012}
2013
2014#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2015pub struct PathKey(Arc<Path>);
2016
2017impl Default for PathKey {
2018 fn default() -> Self {
2019 Self(Path::new("").into())
2020 }
2021}
2022
2023impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2024 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2025 self.0 = summary.max_path.clone();
2026 }
2027}
2028
2029#[derive(Copy, Clone, Debug, PartialEq, Eq)]
2030enum PathSearch<'a> {
2031 Exact(&'a Path),
2032 Successor(&'a Path),
2033}
2034
2035impl<'a> Ord for PathSearch<'a> {
2036 fn cmp(&self, other: &Self) -> cmp::Ordering {
2037 match (self, other) {
2038 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
2039 (Self::Successor(a), Self::Exact(b)) => {
2040 if b.starts_with(a) {
2041 cmp::Ordering::Greater
2042 } else {
2043 a.cmp(b)
2044 }
2045 }
2046 _ => unreachable!("not sure we need the other two cases"),
2047 }
2048 }
2049}
2050
2051impl<'a> PartialOrd for PathSearch<'a> {
2052 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
2053 Some(self.cmp(other))
2054 }
2055}
2056
2057impl<'a> Default for PathSearch<'a> {
2058 fn default() -> Self {
2059 Self::Exact(Path::new("").into())
2060 }
2061}
2062
2063impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
2064 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2065 *self = Self::Exact(summary.max_path.as_ref());
2066 }
2067}
2068
2069#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2070pub struct FileCount(usize);
2071
2072impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
2073 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2074 self.0 += summary.file_count;
2075 }
2076}
2077
2078#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2079pub struct VisibleFileCount(usize);
2080
2081impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
2082 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2083 self.0 += summary.visible_file_count;
2084 }
2085}
2086
2087struct BackgroundScanner {
2088 fs: Arc<dyn Fs>,
2089 snapshot: Arc<Mutex<Snapshot>>,
2090 notify: Sender<ScanState>,
2091 executor: Arc<executor::Background>,
2092}
2093
2094impl BackgroundScanner {
2095 fn new(
2096 snapshot: Arc<Mutex<Snapshot>>,
2097 notify: Sender<ScanState>,
2098 fs: Arc<dyn Fs>,
2099 executor: Arc<executor::Background>,
2100 ) -> Self {
2101 Self {
2102 fs,
2103 snapshot,
2104 notify,
2105 executor,
2106 }
2107 }
2108
2109 fn abs_path(&self) -> Arc<Path> {
2110 self.snapshot.lock().abs_path.clone()
2111 }
2112
2113 fn snapshot(&self) -> Snapshot {
2114 self.snapshot.lock().clone()
2115 }
2116
2117 fn run(mut self, event_stream: fsevent::EventStream) {
2118 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
2119 return;
2120 }
2121
2122 if let Err(err) = smol::block_on(self.scan_dirs()) {
2123 if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
2124 return;
2125 }
2126 }
2127
2128 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
2129 return;
2130 }
2131
2132 event_stream.run(move |events| {
2133 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
2134 return false;
2135 }
2136
2137 if !smol::block_on(self.process_events(events)) {
2138 return false;
2139 }
2140
2141 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
2142 return false;
2143 }
2144
2145 true
2146 });
2147 }
2148
2149 #[cfg(any(test, feature = "test-support"))]
2150 async fn run_test(mut self, mut events_rx: postage::broadcast::Receiver<fsevent::Event>) {
2151 if self.notify.send(ScanState::Scanning).await.is_err() {
2152 return;
2153 }
2154
2155 if let Err(err) = self.scan_dirs().await {
2156 if self
2157 .notify
2158 .send(ScanState::Err(Arc::new(err)))
2159 .await
2160 .is_err()
2161 {
2162 return;
2163 }
2164 }
2165
2166 if self.notify.send(ScanState::Idle).await.is_err() {
2167 return;
2168 }
2169
2170 while let Some(event) = events_rx.recv().await {
2171 let mut events = vec![event];
2172 while let Ok(event) = events_rx.try_recv() {
2173 events.push(event);
2174 }
2175
2176 if self.notify.send(ScanState::Scanning).await.is_err() {
2177 break;
2178 }
2179
2180 if !self.process_events(events).await {
2181 break;
2182 }
2183
2184 if self.notify.send(ScanState::Idle).await.is_err() {
2185 break;
2186 }
2187 }
2188 }
2189
2190 async fn scan_dirs(&mut self) -> Result<()> {
2191 let next_entry_id;
2192 {
2193 let mut snapshot = self.snapshot.lock();
2194 snapshot.scan_id += 1;
2195 next_entry_id = snapshot.next_entry_id.clone();
2196 }
2197
2198 let path: Arc<Path> = Arc::from(Path::new(""));
2199 let abs_path = self.abs_path();
2200
2201 // After determining whether the root entry is a file or a directory, populate the
2202 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
2203 let mut root_name = abs_path
2204 .file_name()
2205 .map_or(String::new(), |f| f.to_string_lossy().to_string());
2206 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
2207 let entry = self
2208 .fs
2209 .entry(root_char_bag, &next_entry_id, path.clone(), &abs_path)
2210 .await?
2211 .ok_or_else(|| anyhow!("root entry does not exist"))?;
2212 let is_dir = entry.is_dir();
2213 if is_dir {
2214 root_name.push('/');
2215 }
2216
2217 {
2218 let mut snapshot = self.snapshot.lock();
2219 snapshot.root_name = root_name;
2220 snapshot.root_char_bag = root_char_bag;
2221 }
2222
2223 self.snapshot.lock().insert_entry(entry);
2224 if is_dir {
2225 let (tx, rx) = channel::unbounded();
2226 tx.send(ScanJob {
2227 abs_path: abs_path.to_path_buf(),
2228 path,
2229 ignore_stack: IgnoreStack::none(),
2230 scan_queue: tx.clone(),
2231 })
2232 .await
2233 .unwrap();
2234 drop(tx);
2235
2236 self.executor
2237 .scoped(|scope| {
2238 for _ in 0..self.executor.threads() {
2239 scope.spawn(async {
2240 while let Ok(job) = rx.recv().await {
2241 if let Err(err) = self
2242 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2243 .await
2244 {
2245 log::error!("error scanning {:?}: {}", job.abs_path, err);
2246 }
2247 }
2248 });
2249 }
2250 })
2251 .await;
2252 }
2253
2254 Ok(())
2255 }
2256
2257 async fn scan_dir(
2258 &self,
2259 root_char_bag: CharBag,
2260 next_entry_id: Arc<AtomicUsize>,
2261 job: &ScanJob,
2262 ) -> Result<()> {
2263 let mut new_entries: Vec<Entry> = Vec::new();
2264 let mut new_jobs: Vec<ScanJob> = Vec::new();
2265 let mut ignore_stack = job.ignore_stack.clone();
2266 let mut new_ignore = None;
2267
2268 let mut child_entries = self
2269 .fs
2270 .child_entries(
2271 root_char_bag,
2272 next_entry_id.as_ref(),
2273 &job.path,
2274 &job.abs_path,
2275 )
2276 .await?;
2277 while let Some(child_entry) = child_entries.next().await {
2278 let mut child_entry = match child_entry {
2279 Ok(child_entry) => child_entry,
2280 Err(error) => {
2281 log::error!("error processing entry {:?}", error);
2282 continue;
2283 }
2284 };
2285 let child_name = child_entry.path.file_name().unwrap();
2286 let child_abs_path = job.abs_path.join(&child_name);
2287 let child_path = child_entry.path.clone();
2288
2289 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2290 if child_name == *GITIGNORE {
2291 let (ignore, err) = Gitignore::new(&child_abs_path);
2292 if let Some(err) = err {
2293 log::error!("error in ignore file {:?} - {:?}", child_entry.path, err);
2294 }
2295 let ignore = Arc::new(ignore);
2296 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2297 new_ignore = Some(ignore);
2298
2299 // Update ignore status of any child entries we've already processed to reflect the
2300 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2301 // there should rarely be too numerous. Update the ignore stack associated with any
2302 // new jobs as well.
2303 let mut new_jobs = new_jobs.iter_mut();
2304 for entry in &mut new_entries {
2305 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2306 if entry.is_dir() {
2307 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2308 IgnoreStack::all()
2309 } else {
2310 ignore_stack.clone()
2311 };
2312 }
2313 }
2314 }
2315
2316 if child_entry.is_dir() {
2317 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2318 child_entry.is_ignored = is_ignored;
2319 new_entries.push(child_entry);
2320 new_jobs.push(ScanJob {
2321 abs_path: child_abs_path,
2322 path: child_path,
2323 ignore_stack: if is_ignored {
2324 IgnoreStack::all()
2325 } else {
2326 ignore_stack.clone()
2327 },
2328 scan_queue: job.scan_queue.clone(),
2329 });
2330 } else {
2331 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2332 new_entries.push(child_entry);
2333 };
2334 }
2335
2336 self.snapshot
2337 .lock()
2338 .populate_dir(job.path.clone(), new_entries, new_ignore);
2339 for new_job in new_jobs {
2340 job.scan_queue.send(new_job).await.unwrap();
2341 }
2342
2343 Ok(())
2344 }
2345
2346 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2347 let mut snapshot = self.snapshot();
2348 snapshot.scan_id += 1;
2349
2350 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2351 abs_path
2352 } else {
2353 return false;
2354 };
2355 let root_char_bag = snapshot.root_char_bag;
2356 let next_entry_id = snapshot.next_entry_id.clone();
2357
2358 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2359 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2360
2361 for event in &events {
2362 match event.path.strip_prefix(&root_abs_path) {
2363 Ok(path) => snapshot.remove_path(&path),
2364 Err(_) => {
2365 log::error!(
2366 "unexpected event {:?} for root path {:?}",
2367 event.path,
2368 root_abs_path
2369 );
2370 continue;
2371 }
2372 }
2373 }
2374
2375 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2376 for event in events {
2377 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2378 Ok(path) => Arc::from(path.to_path_buf()),
2379 Err(_) => {
2380 log::error!(
2381 "unexpected event {:?} for root path {:?}",
2382 event.path,
2383 root_abs_path
2384 );
2385 continue;
2386 }
2387 };
2388
2389 match self
2390 .fs
2391 .entry(
2392 snapshot.root_char_bag,
2393 &next_entry_id,
2394 path.clone(),
2395 &event.path,
2396 )
2397 .await
2398 {
2399 Ok(Some(mut fs_entry)) => {
2400 let is_dir = fs_entry.is_dir();
2401 let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
2402 fs_entry.is_ignored = ignore_stack.is_all();
2403 snapshot.insert_entry(fs_entry);
2404 if is_dir {
2405 scan_queue_tx
2406 .send(ScanJob {
2407 abs_path: event.path,
2408 path,
2409 ignore_stack,
2410 scan_queue: scan_queue_tx.clone(),
2411 })
2412 .await
2413 .unwrap();
2414 }
2415 }
2416 Ok(None) => {}
2417 Err(err) => {
2418 // TODO - create a special 'error' entry in the entries tree to mark this
2419 log::error!("error reading file on event {:?}", err);
2420 }
2421 }
2422 }
2423
2424 *self.snapshot.lock() = snapshot;
2425
2426 // Scan any directories that were created as part of this event batch.
2427 drop(scan_queue_tx);
2428 self.executor
2429 .scoped(|scope| {
2430 for _ in 0..self.executor.threads() {
2431 scope.spawn(async {
2432 while let Ok(job) = scan_queue_rx.recv().await {
2433 if let Err(err) = self
2434 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2435 .await
2436 {
2437 log::error!("error scanning {:?}: {}", job.abs_path, err);
2438 }
2439 }
2440 });
2441 }
2442 })
2443 .await;
2444
2445 // Attempt to detect renames only over a single batch of file-system events.
2446 self.snapshot.lock().removed_entry_ids.clear();
2447
2448 self.update_ignore_statuses().await;
2449 true
2450 }
2451
2452 async fn update_ignore_statuses(&self) {
2453 let mut snapshot = self.snapshot();
2454
2455 let mut ignores_to_update = Vec::new();
2456 let mut ignores_to_delete = Vec::new();
2457 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2458 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2459 ignores_to_update.push(parent_path.clone());
2460 }
2461
2462 let ignore_path = parent_path.join(&*GITIGNORE);
2463 if snapshot.entry_for_path(ignore_path).is_none() {
2464 ignores_to_delete.push(parent_path.clone());
2465 }
2466 }
2467
2468 for parent_path in ignores_to_delete {
2469 snapshot.ignores.remove(&parent_path);
2470 self.snapshot.lock().ignores.remove(&parent_path);
2471 }
2472
2473 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2474 ignores_to_update.sort_unstable();
2475 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2476 while let Some(parent_path) = ignores_to_update.next() {
2477 while ignores_to_update
2478 .peek()
2479 .map_or(false, |p| p.starts_with(&parent_path))
2480 {
2481 ignores_to_update.next().unwrap();
2482 }
2483
2484 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2485 ignore_queue_tx
2486 .send(UpdateIgnoreStatusJob {
2487 path: parent_path,
2488 ignore_stack,
2489 ignore_queue: ignore_queue_tx.clone(),
2490 })
2491 .await
2492 .unwrap();
2493 }
2494 drop(ignore_queue_tx);
2495
2496 self.executor
2497 .scoped(|scope| {
2498 for _ in 0..self.executor.threads() {
2499 scope.spawn(async {
2500 while let Ok(job) = ignore_queue_rx.recv().await {
2501 self.update_ignore_status(job, &snapshot).await;
2502 }
2503 });
2504 }
2505 })
2506 .await;
2507 }
2508
2509 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2510 let mut ignore_stack = job.ignore_stack;
2511 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2512 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2513 }
2514
2515 let mut edits = Vec::new();
2516 for mut entry in snapshot.child_entries(&job.path).cloned() {
2517 let was_ignored = entry.is_ignored;
2518 entry.is_ignored = ignore_stack.is_path_ignored(entry.path(), entry.is_dir());
2519 if entry.is_dir() {
2520 let child_ignore_stack = if entry.is_ignored {
2521 IgnoreStack::all()
2522 } else {
2523 ignore_stack.clone()
2524 };
2525 job.ignore_queue
2526 .send(UpdateIgnoreStatusJob {
2527 path: entry.path().clone(),
2528 ignore_stack: child_ignore_stack,
2529 ignore_queue: job.ignore_queue.clone(),
2530 })
2531 .await
2532 .unwrap();
2533 }
2534
2535 if entry.is_ignored != was_ignored {
2536 edits.push(Edit::Insert(entry));
2537 }
2538 }
2539 self.snapshot.lock().entries_by_path.edit(edits, &());
2540 }
2541}
2542
2543async fn refresh_entry(
2544 fs: &dyn Fs,
2545 snapshot: &Mutex<Snapshot>,
2546 path: Arc<Path>,
2547 abs_path: &Path,
2548) -> Result<Entry> {
2549 let root_char_bag;
2550 let next_entry_id;
2551 {
2552 let snapshot = snapshot.lock();
2553 root_char_bag = snapshot.root_char_bag;
2554 next_entry_id = snapshot.next_entry_id.clone();
2555 }
2556 let entry = fs
2557 .entry(root_char_bag, &next_entry_id, path, abs_path)
2558 .await?
2559 .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
2560 Ok(snapshot.lock().insert_entry(entry))
2561}
2562
2563fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2564 let mut result = root_char_bag;
2565 result.extend(
2566 path.to_string_lossy()
2567 .chars()
2568 .map(|c| c.to_ascii_lowercase()),
2569 );
2570 result
2571}
2572
2573struct ScanJob {
2574 abs_path: PathBuf,
2575 path: Arc<Path>,
2576 ignore_stack: Arc<IgnoreStack>,
2577 scan_queue: Sender<ScanJob>,
2578}
2579
2580struct UpdateIgnoreStatusJob {
2581 path: Arc<Path>,
2582 ignore_stack: Arc<IgnoreStack>,
2583 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2584}
2585
2586pub trait WorktreeHandle {
2587 #[cfg(test)]
2588 fn flush_fs_events<'a>(
2589 &self,
2590 cx: &'a gpui::TestAppContext,
2591 ) -> futures::future::LocalBoxFuture<'a, ()>;
2592}
2593
2594impl WorktreeHandle for ModelHandle<Worktree> {
2595 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2596 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2597 // extra directory scans, and emit extra scan-state notifications.
2598 //
2599 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2600 // to ensure that all redundant FS events have already been processed.
2601 #[cfg(test)]
2602 fn flush_fs_events<'a>(
2603 &self,
2604 cx: &'a gpui::TestAppContext,
2605 ) -> futures::future::LocalBoxFuture<'a, ()> {
2606 use smol::future::FutureExt;
2607
2608 let filename = "fs-event-sentinel";
2609 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2610 let tree = self.clone();
2611 async move {
2612 std::fs::write(root_path.join(filename), "").unwrap();
2613 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2614 .await;
2615
2616 std::fs::remove_file(root_path.join(filename)).unwrap();
2617 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2618 .await;
2619
2620 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2621 .await;
2622 }
2623 .boxed_local()
2624 }
2625}
2626
2627pub enum FileIter<'a> {
2628 All(Cursor<'a, Entry, FileCount, ()>),
2629 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2630}
2631
2632impl<'a> FileIter<'a> {
2633 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2634 let mut cursor = snapshot.entries_by_path.cursor();
2635 cursor.seek(&FileCount(start), Bias::Right, &());
2636 Self::All(cursor)
2637 }
2638
2639 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2640 let mut cursor = snapshot.entries_by_path.cursor();
2641 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2642 Self::Visible(cursor)
2643 }
2644
2645 fn next_internal(&mut self) {
2646 match self {
2647 Self::All(cursor) => {
2648 let ix = *cursor.seek_start();
2649 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2650 }
2651 Self::Visible(cursor) => {
2652 let ix = *cursor.seek_start();
2653 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2654 }
2655 }
2656 }
2657
2658 fn item(&self) -> Option<&'a Entry> {
2659 match self {
2660 Self::All(cursor) => cursor.item(),
2661 Self::Visible(cursor) => cursor.item(),
2662 }
2663 }
2664}
2665
2666impl<'a> Iterator for FileIter<'a> {
2667 type Item = &'a Entry;
2668
2669 fn next(&mut self) -> Option<Self::Item> {
2670 if let Some(entry) = self.item() {
2671 self.next_internal();
2672 Some(entry)
2673 } else {
2674 None
2675 }
2676 }
2677}
2678
2679struct ChildEntriesIter<'a> {
2680 parent_path: &'a Path,
2681 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2682}
2683
2684impl<'a> ChildEntriesIter<'a> {
2685 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2686 let mut cursor = snapshot.entries_by_path.cursor();
2687 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2688 Self {
2689 parent_path,
2690 cursor,
2691 }
2692 }
2693}
2694
2695impl<'a> Iterator for ChildEntriesIter<'a> {
2696 type Item = &'a Entry;
2697
2698 fn next(&mut self) -> Option<Self::Item> {
2699 if let Some(item) = self.cursor.item() {
2700 if item.path().starts_with(self.parent_path) {
2701 self.cursor
2702 .seek_forward(&PathSearch::Successor(item.path()), Bias::Left, &());
2703 Some(item)
2704 } else {
2705 None
2706 }
2707 } else {
2708 None
2709 }
2710 }
2711}
2712
2713impl<'a> From<&'a Entry> for proto::Entry {
2714 fn from(entry: &'a Entry) -> Self {
2715 Self {
2716 id: entry.id as u64,
2717 is_dir: entry.is_dir(),
2718 path: entry.path.to_string_lossy().to_string(),
2719 inode: entry.inode,
2720 mtime: Some(entry.mtime.into()),
2721 is_symlink: entry.is_symlink,
2722 is_ignored: entry.is_ignored,
2723 }
2724 }
2725}
2726
2727impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2728 type Error = anyhow::Error;
2729
2730 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2731 if let Some(mtime) = entry.mtime {
2732 let kind = if entry.is_dir {
2733 EntryKind::Dir
2734 } else {
2735 let mut char_bag = root_char_bag.clone();
2736 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2737 EntryKind::File(char_bag)
2738 };
2739 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2740 Ok(Entry {
2741 id: entry.id as usize,
2742 kind,
2743 path: path.clone(),
2744 inode: entry.inode,
2745 mtime: mtime.into(),
2746 is_symlink: entry.is_symlink,
2747 is_ignored: entry.is_ignored,
2748 })
2749 } else {
2750 Err(anyhow!(
2751 "missing mtime in remote worktree entry {:?}",
2752 entry.path
2753 ))
2754 }
2755 }
2756}
2757
2758mod remote {
2759 use super::*;
2760
2761 pub async fn add_peer(
2762 envelope: TypedEnvelope<proto::AddPeer>,
2763 rpc: &rpc::Client,
2764 cx: &mut AsyncAppContext,
2765 ) -> anyhow::Result<()> {
2766 rpc.state
2767 .read()
2768 .await
2769 .shared_worktree(envelope.payload.worktree_id, cx)?
2770 .update(cx, |worktree, cx| worktree.add_peer(envelope, cx))
2771 }
2772
2773 pub async fn remove_peer(
2774 envelope: TypedEnvelope<proto::RemovePeer>,
2775 rpc: &rpc::Client,
2776 cx: &mut AsyncAppContext,
2777 ) -> anyhow::Result<()> {
2778 rpc.state
2779 .read()
2780 .await
2781 .shared_worktree(envelope.payload.worktree_id, cx)?
2782 .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
2783 }
2784
2785 pub async fn update_worktree(
2786 envelope: TypedEnvelope<proto::UpdateWorktree>,
2787 rpc: &rpc::Client,
2788 cx: &mut AsyncAppContext,
2789 ) -> anyhow::Result<()> {
2790 rpc.state
2791 .read()
2792 .await
2793 .shared_worktree(envelope.payload.worktree_id, cx)?
2794 .update(cx, |worktree, _| {
2795 if let Some(worktree) = worktree.as_remote_mut() {
2796 let mut tx = worktree.updates_tx.clone();
2797 Ok(async move {
2798 tx.send(envelope.payload)
2799 .await
2800 .expect("receiver runs to completion");
2801 })
2802 } else {
2803 Err(anyhow!(
2804 "invalid update message for local worktree {}",
2805 envelope.payload.worktree_id
2806 ))
2807 }
2808 })?
2809 .await;
2810
2811 Ok(())
2812 }
2813
2814 pub async fn open_buffer(
2815 envelope: TypedEnvelope<proto::OpenBuffer>,
2816 rpc: &rpc::Client,
2817 cx: &mut AsyncAppContext,
2818 ) -> anyhow::Result<()> {
2819 let receipt = envelope.receipt();
2820 let worktree = rpc
2821 .state
2822 .read()
2823 .await
2824 .shared_worktree(envelope.payload.worktree_id, cx)?;
2825
2826 let response = worktree
2827 .update(cx, |worktree, cx| {
2828 worktree
2829 .as_local_mut()
2830 .unwrap()
2831 .open_remote_buffer(envelope, cx)
2832 })
2833 .await?;
2834
2835 rpc.respond(receipt, response).await?;
2836
2837 Ok(())
2838 }
2839
2840 pub async fn close_buffer(
2841 envelope: TypedEnvelope<proto::CloseBuffer>,
2842 rpc: &rpc::Client,
2843 cx: &mut AsyncAppContext,
2844 ) -> anyhow::Result<()> {
2845 let worktree = rpc
2846 .state
2847 .read()
2848 .await
2849 .shared_worktree(envelope.payload.worktree_id, cx)?;
2850
2851 worktree.update(cx, |worktree, cx| {
2852 worktree
2853 .as_local_mut()
2854 .unwrap()
2855 .close_remote_buffer(envelope, cx)
2856 })
2857 }
2858
2859 pub async fn update_buffer(
2860 envelope: TypedEnvelope<proto::UpdateBuffer>,
2861 rpc: &rpc::Client,
2862 cx: &mut AsyncAppContext,
2863 ) -> anyhow::Result<()> {
2864 eprintln!("got update buffer message {:?}", envelope.payload);
2865
2866 let message = envelope.payload;
2867 rpc.state
2868 .read()
2869 .await
2870 .shared_worktree(message.worktree_id, cx)?
2871 .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
2872 Ok(())
2873 }
2874
2875 pub async fn save_buffer(
2876 envelope: TypedEnvelope<proto::SaveBuffer>,
2877 rpc: &rpc::Client,
2878 cx: &mut AsyncAppContext,
2879 ) -> anyhow::Result<()> {
2880 eprintln!("got save buffer message {:?}", envelope.payload);
2881
2882 let state = rpc.state.read().await;
2883 let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
2884 let sender_id = envelope.original_sender_id()?;
2885 let buffer = worktree.read_with(cx, |tree, _| {
2886 tree.as_local()
2887 .unwrap()
2888 .shared_buffers
2889 .get(&sender_id)
2890 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
2891 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
2892 })?;
2893 let (version, mtime) = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
2894 rpc.respond(
2895 envelope.receipt(),
2896 proto::BufferSaved {
2897 worktree_id: envelope.payload.worktree_id,
2898 buffer_id: envelope.payload.buffer_id,
2899 version: (&version).into(),
2900 mtime: Some(mtime.into()),
2901 },
2902 )
2903 .await?;
2904 Ok(())
2905 }
2906
2907 pub async fn buffer_saved(
2908 envelope: TypedEnvelope<proto::BufferSaved>,
2909 rpc: &rpc::Client,
2910 cx: &mut AsyncAppContext,
2911 ) -> anyhow::Result<()> {
2912 eprintln!("got buffer_saved {:?}", envelope.payload);
2913
2914 rpc.state
2915 .read()
2916 .await
2917 .shared_worktree(envelope.payload.worktree_id, cx)?
2918 .update(cx, |worktree, cx| {
2919 worktree.buffer_saved(envelope.payload, cx)
2920 })?;
2921 Ok(())
2922 }
2923}
2924
2925#[cfg(test)]
2926mod tests {
2927 use super::*;
2928 use crate::test::*;
2929 use anyhow::Result;
2930 use rand::prelude::*;
2931 use serde_json::json;
2932 use std::time::UNIX_EPOCH;
2933 use std::{env, fmt::Write, os::unix, time::SystemTime};
2934
2935 #[gpui::test]
2936 async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
2937 let dir = temp_tree(json!({
2938 "root": {
2939 "apple": "",
2940 "banana": {
2941 "carrot": {
2942 "date": "",
2943 "endive": "",
2944 }
2945 },
2946 "fennel": {
2947 "grape": "",
2948 }
2949 }
2950 }));
2951
2952 let root_link_path = dir.path().join("root_link");
2953 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2954 unix::fs::symlink(
2955 &dir.path().join("root/fennel"),
2956 &dir.path().join("root/finnochio"),
2957 )
2958 .unwrap();
2959
2960 let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx));
2961
2962 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2963 .await;
2964 cx.read(|cx| {
2965 let tree = tree.read(cx);
2966 assert_eq!(tree.file_count(), 5);
2967
2968 assert_eq!(
2969 tree.inode_for_path("fennel/grape"),
2970 tree.inode_for_path("finnochio/grape")
2971 );
2972
2973 let results = match_paths(
2974 Some(tree.snapshot()).iter(),
2975 "bna",
2976 false,
2977 false,
2978 false,
2979 10,
2980 Default::default(),
2981 cx.thread_pool().clone(),
2982 )
2983 .into_iter()
2984 .map(|result| result.path)
2985 .collect::<Vec<Arc<Path>>>();
2986 assert_eq!(
2987 results,
2988 vec![
2989 PathBuf::from("banana/carrot/date").into(),
2990 PathBuf::from("banana/carrot/endive").into(),
2991 ]
2992 );
2993 })
2994 }
2995
2996 #[gpui::test]
2997 async fn test_save_file(mut cx: gpui::TestAppContext) {
2998 let app_state = cx.read(build_app_state);
2999 let dir = temp_tree(json!({
3000 "file1": "the old contents",
3001 }));
3002 let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages.clone(), cx));
3003 let buffer = tree
3004 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3005 .await
3006 .unwrap();
3007 let save = buffer.update(&mut cx, |buffer, cx| {
3008 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3009 buffer.save(cx).unwrap()
3010 });
3011 save.await.unwrap();
3012
3013 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3014 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3015 }
3016
3017 #[gpui::test]
3018 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3019 let app_state = cx.read(build_app_state);
3020 let dir = temp_tree(json!({
3021 "file1": "the old contents",
3022 }));
3023 let file_path = dir.path().join("file1");
3024
3025 let tree =
3026 cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages.clone(), cx));
3027 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3028 .await;
3029 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3030
3031 let buffer = tree
3032 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3033 .await
3034 .unwrap();
3035 let save = buffer.update(&mut cx, |buffer, cx| {
3036 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3037 buffer.save(cx).unwrap()
3038 });
3039 save.await.unwrap();
3040
3041 let new_text = std::fs::read_to_string(file_path).unwrap();
3042 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3043 }
3044
3045 #[gpui::test]
3046 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3047 let dir = temp_tree(json!({
3048 "a": {
3049 "file1": "",
3050 "file2": "",
3051 "file3": "",
3052 },
3053 "b": {
3054 "c": {
3055 "file4": "",
3056 "file5": "",
3057 }
3058 }
3059 }));
3060
3061 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
3062
3063 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3064 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3065 async move { buffer.await.unwrap() }
3066 };
3067 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3068 tree.read_with(cx, |tree, _| {
3069 tree.entry_for_path(path)
3070 .expect(&format!("no entry for path {}", path))
3071 .id
3072 })
3073 };
3074
3075 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3076 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3077 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3078 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3079
3080 let file2_id = id_for_path("a/file2", &cx);
3081 let file3_id = id_for_path("a/file3", &cx);
3082 let file4_id = id_for_path("b/c/file4", &cx);
3083
3084 // Wait for the initial scan.
3085 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3086 .await;
3087
3088 // Create a remote copy of this worktree.
3089 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3090 let worktree_id = 1;
3091 let share_request = tree
3092 .update(&mut cx, |tree, cx| {
3093 tree.as_local().unwrap().share_request(cx)
3094 })
3095 .await;
3096 let remote = Worktree::remote(
3097 proto::OpenWorktreeResponse {
3098 worktree_id,
3099 worktree: share_request.worktree,
3100 replica_id: 1,
3101 peers: Vec::new(),
3102 },
3103 rpc::Client::new(Default::default()),
3104 Default::default(),
3105 &mut cx.to_async(),
3106 )
3107 .await
3108 .unwrap();
3109
3110 cx.read(|cx| {
3111 assert!(!buffer2.read(cx).is_dirty());
3112 assert!(!buffer3.read(cx).is_dirty());
3113 assert!(!buffer4.read(cx).is_dirty());
3114 assert!(!buffer5.read(cx).is_dirty());
3115 });
3116
3117 // Rename and delete files and directories.
3118 tree.flush_fs_events(&cx).await;
3119 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3120 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3121 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3122 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3123 tree.flush_fs_events(&cx).await;
3124
3125 let expected_paths = vec![
3126 "a",
3127 "a/file1",
3128 "a/file2.new",
3129 "b",
3130 "d",
3131 "d/file3",
3132 "d/file4",
3133 ];
3134
3135 cx.read(|app| {
3136 assert_eq!(
3137 tree.read(app)
3138 .paths()
3139 .map(|p| p.to_str().unwrap())
3140 .collect::<Vec<_>>(),
3141 expected_paths
3142 );
3143
3144 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3145 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3146 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3147
3148 assert_eq!(
3149 buffer2.read(app).file().unwrap().path().as_ref(),
3150 Path::new("a/file2.new")
3151 );
3152 assert_eq!(
3153 buffer3.read(app).file().unwrap().path().as_ref(),
3154 Path::new("d/file3")
3155 );
3156 assert_eq!(
3157 buffer4.read(app).file().unwrap().path().as_ref(),
3158 Path::new("d/file4")
3159 );
3160 assert_eq!(
3161 buffer5.read(app).file().unwrap().path().as_ref(),
3162 Path::new("b/c/file5")
3163 );
3164
3165 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3166 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3167 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3168 assert!(buffer5.read(app).file().unwrap().is_deleted());
3169 });
3170
3171 // Update the remote worktree. Check that it becomes consistent with the
3172 // local worktree.
3173 remote.update(&mut cx, |remote, cx| {
3174 let update_message = tree
3175 .read(cx)
3176 .snapshot()
3177 .build_update(&initial_snapshot, worktree_id);
3178 remote
3179 .as_remote_mut()
3180 .unwrap()
3181 .snapshot
3182 .apply_update(update_message)
3183 .unwrap();
3184
3185 assert_eq!(
3186 remote
3187 .paths()
3188 .map(|p| p.to_str().unwrap())
3189 .collect::<Vec<_>>(),
3190 expected_paths
3191 );
3192 });
3193 }
3194
3195 #[gpui::test]
3196 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3197 let dir = temp_tree(json!({
3198 ".git": {},
3199 ".gitignore": "ignored-dir\n",
3200 "tracked-dir": {
3201 "tracked-file1": "tracked contents",
3202 },
3203 "ignored-dir": {
3204 "ignored-file1": "ignored contents",
3205 }
3206 }));
3207
3208 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
3209 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3210 .await;
3211 tree.flush_fs_events(&cx).await;
3212 cx.read(|cx| {
3213 let tree = tree.read(cx);
3214 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3215 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3216 assert_eq!(tracked.is_ignored(), false);
3217 assert_eq!(ignored.is_ignored(), true);
3218 });
3219
3220 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3221 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3222 tree.flush_fs_events(&cx).await;
3223 cx.read(|cx| {
3224 let tree = tree.read(cx);
3225 let dot_git = tree.entry_for_path(".git").unwrap();
3226 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3227 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3228 assert_eq!(tracked.is_ignored(), false);
3229 assert_eq!(ignored.is_ignored(), true);
3230 assert_eq!(dot_git.is_ignored(), true);
3231 });
3232 }
3233
3234 #[test]
3235 fn test_random() {
3236 let iterations = env::var("ITERATIONS")
3237 .map(|i| i.parse().unwrap())
3238 .unwrap_or(100);
3239 let operations = env::var("OPERATIONS")
3240 .map(|o| o.parse().unwrap())
3241 .unwrap_or(40);
3242 let initial_entries = env::var("INITIAL_ENTRIES")
3243 .map(|o| o.parse().unwrap())
3244 .unwrap_or(20);
3245 let seeds = if let Ok(seed) = env::var("SEED").map(|s| s.parse().unwrap()) {
3246 seed..seed + 1
3247 } else {
3248 0..iterations
3249 };
3250
3251 for seed in seeds {
3252 dbg!(seed);
3253 let mut rng = StdRng::seed_from_u64(seed);
3254
3255 let root_dir = tempdir::TempDir::new(&format!("test-{}", seed)).unwrap();
3256 for _ in 0..initial_entries {
3257 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3258 }
3259 log::info!("Generated initial tree");
3260
3261 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3262 let mut scanner = BackgroundScanner::new(
3263 Arc::new(Mutex::new(Snapshot {
3264 id: 0,
3265 scan_id: 0,
3266 abs_path: root_dir.path().into(),
3267 entries_by_path: Default::default(),
3268 entries_by_id: Default::default(),
3269 removed_entry_ids: Default::default(),
3270 ignores: Default::default(),
3271 root_name: Default::default(),
3272 root_char_bag: Default::default(),
3273 next_entry_id: Default::default(),
3274 })),
3275 notify_tx,
3276 Arc::new(ProductionFs),
3277 Arc::new(gpui::executor::Background::new()),
3278 );
3279 smol::block_on(scanner.scan_dirs()).unwrap();
3280 scanner.snapshot().check_invariants();
3281
3282 let mut events = Vec::new();
3283 let mut mutations_len = operations;
3284 while mutations_len > 1 {
3285 if !events.is_empty() && rng.gen_bool(0.4) {
3286 let len = rng.gen_range(0..=events.len());
3287 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3288 log::info!("Delivering events: {:#?}", to_deliver);
3289 smol::block_on(scanner.process_events(to_deliver));
3290 scanner.snapshot().check_invariants();
3291 } else {
3292 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3293 mutations_len -= 1;
3294 }
3295 }
3296 log::info!("Quiescing: {:#?}", events);
3297 smol::block_on(scanner.process_events(events));
3298 scanner.snapshot().check_invariants();
3299
3300 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3301 let mut new_scanner = BackgroundScanner::new(
3302 Arc::new(Mutex::new(Snapshot {
3303 id: 0,
3304 scan_id: 0,
3305 abs_path: root_dir.path().into(),
3306 entries_by_path: Default::default(),
3307 entries_by_id: Default::default(),
3308 removed_entry_ids: Default::default(),
3309 ignores: Default::default(),
3310 root_name: Default::default(),
3311 root_char_bag: Default::default(),
3312 next_entry_id: Default::default(),
3313 })),
3314 notify_tx,
3315 scanner.fs.clone(),
3316 scanner.executor.clone(),
3317 );
3318 smol::block_on(new_scanner.scan_dirs()).unwrap();
3319 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3320 }
3321 }
3322
3323 fn randomly_mutate_tree(
3324 root_path: &Path,
3325 insertion_probability: f64,
3326 rng: &mut impl Rng,
3327 ) -> Result<Vec<fsevent::Event>> {
3328 let root_path = root_path.canonicalize().unwrap();
3329 let (dirs, files) = read_dir_recursive(root_path.clone());
3330
3331 let mut events = Vec::new();
3332 let mut record_event = |path: PathBuf| {
3333 events.push(fsevent::Event {
3334 event_id: SystemTime::now()
3335 .duration_since(UNIX_EPOCH)
3336 .unwrap()
3337 .as_secs(),
3338 flags: fsevent::StreamFlags::empty(),
3339 path,
3340 });
3341 };
3342
3343 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3344 let path = dirs.choose(rng).unwrap();
3345 let new_path = path.join(gen_name(rng));
3346
3347 if rng.gen() {
3348 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3349 std::fs::create_dir(&new_path)?;
3350 } else {
3351 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3352 std::fs::write(&new_path, "")?;
3353 }
3354 record_event(new_path);
3355 } else if rng.gen_bool(0.05) {
3356 let ignore_dir_path = dirs.choose(rng).unwrap();
3357 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3358
3359 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3360 let files_to_ignore = {
3361 let len = rng.gen_range(0..=subfiles.len());
3362 subfiles.choose_multiple(rng, len)
3363 };
3364 let dirs_to_ignore = {
3365 let len = rng.gen_range(0..subdirs.len());
3366 subdirs.choose_multiple(rng, len)
3367 };
3368
3369 let mut ignore_contents = String::new();
3370 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3371 write!(
3372 ignore_contents,
3373 "{}\n",
3374 path_to_ignore
3375 .strip_prefix(&ignore_dir_path)?
3376 .to_str()
3377 .unwrap()
3378 )
3379 .unwrap();
3380 }
3381 log::info!(
3382 "Creating {:?} with contents:\n{}",
3383 ignore_path.strip_prefix(&root_path)?,
3384 ignore_contents
3385 );
3386 std::fs::write(&ignore_path, ignore_contents).unwrap();
3387 record_event(ignore_path);
3388 } else {
3389 let old_path = {
3390 let file_path = files.choose(rng);
3391 let dir_path = dirs[1..].choose(rng);
3392 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3393 };
3394
3395 let is_rename = rng.gen();
3396 if is_rename {
3397 let new_path_parent = dirs
3398 .iter()
3399 .filter(|d| !d.starts_with(old_path))
3400 .choose(rng)
3401 .unwrap();
3402
3403 let overwrite_existing_dir =
3404 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3405 let new_path = if overwrite_existing_dir {
3406 std::fs::remove_dir_all(&new_path_parent).ok();
3407 new_path_parent.to_path_buf()
3408 } else {
3409 new_path_parent.join(gen_name(rng))
3410 };
3411
3412 log::info!(
3413 "Renaming {:?} to {}{:?}",
3414 old_path.strip_prefix(&root_path)?,
3415 if overwrite_existing_dir {
3416 "overwrite "
3417 } else {
3418 ""
3419 },
3420 new_path.strip_prefix(&root_path)?
3421 );
3422 std::fs::rename(&old_path, &new_path)?;
3423 record_event(old_path.clone());
3424 record_event(new_path);
3425 } else if old_path.is_dir() {
3426 let (dirs, files) = read_dir_recursive(old_path.clone());
3427
3428 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3429 std::fs::remove_dir_all(&old_path).unwrap();
3430 for file in files {
3431 record_event(file);
3432 }
3433 for dir in dirs {
3434 record_event(dir);
3435 }
3436 } else {
3437 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3438 std::fs::remove_file(old_path).unwrap();
3439 record_event(old_path.clone());
3440 }
3441 }
3442
3443 Ok(events)
3444 }
3445
3446 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3447 let child_entries = std::fs::read_dir(&path).unwrap();
3448 let mut dirs = vec![path];
3449 let mut files = Vec::new();
3450 for child_entry in child_entries {
3451 let child_path = child_entry.unwrap().path();
3452 if child_path.is_dir() {
3453 let (child_dirs, child_files) = read_dir_recursive(child_path);
3454 dirs.extend(child_dirs);
3455 files.extend(child_files);
3456 } else {
3457 files.push(child_path);
3458 }
3459 }
3460 (dirs, files)
3461 }
3462
3463 fn gen_name(rng: &mut impl Rng) -> String {
3464 (0..6)
3465 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3466 .map(char::from)
3467 .collect()
3468 }
3469
3470 impl Snapshot {
3471 fn check_invariants(&self) {
3472 let mut files = self.files(0);
3473 let mut visible_files = self.visible_files(0);
3474 for entry in self.entries_by_path.cursor::<(), ()>() {
3475 if entry.is_file() {
3476 assert_eq!(files.next().unwrap().inode(), entry.inode);
3477 if !entry.is_ignored {
3478 assert_eq!(visible_files.next().unwrap().inode(), entry.inode);
3479 }
3480 }
3481 }
3482 assert!(files.next().is_none());
3483 assert!(visible_files.next().is_none());
3484
3485 let mut bfs_paths = Vec::new();
3486 let mut stack = vec![Path::new("")];
3487 while let Some(path) = stack.pop() {
3488 bfs_paths.push(path);
3489 let ix = stack.len();
3490 for child_entry in self.child_entries(path) {
3491 stack.insert(ix, child_entry.path());
3492 }
3493 }
3494
3495 let dfs_paths = self
3496 .entries_by_path
3497 .cursor::<(), ()>()
3498 .map(|e| e.path().as_ref())
3499 .collect::<Vec<_>>();
3500 assert_eq!(bfs_paths, dfs_paths);
3501
3502 for (ignore_parent_path, _) in &self.ignores {
3503 assert!(self.entry_for_path(ignore_parent_path).is_some());
3504 assert!(self
3505 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3506 .is_some());
3507 }
3508 }
3509
3510 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3511 let mut paths = Vec::new();
3512 for entry in self.entries_by_path.cursor::<(), ()>() {
3513 paths.push((entry.path().as_ref(), entry.inode(), entry.is_ignored()));
3514 }
3515 paths.sort_by(|a, b| a.0.cmp(&b.0));
3516 paths
3517 }
3518 }
3519}