1mod char_bag;
2mod fuzzy;
3mod ignore;
4
5use self::{char_bag::CharBag, ignore::IgnoreStack};
6use crate::{
7 editor::{self, Buffer, History, Operation, Rope},
8 language::LanguageRegistry,
9 rpc::{self, proto},
10 sum_tree::{self, Cursor, Edit, SumTree},
11 time::{self, ReplicaId},
12 util::Bias,
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Context, Result};
16use atomic::Ordering::SeqCst;
17use futures::{Stream, StreamExt};
18pub use fuzzy::{match_paths, PathMatch};
19use gpui::{
20 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
21 Task, WeakModelHandle,
22};
23use lazy_static::lazy_static;
24use parking_lot::Mutex;
25use postage::{
26 prelude::{Sink as _, Stream as _},
27 watch,
28};
29use smol::{
30 channel::{self, Sender},
31 io::{AsyncReadExt, AsyncWriteExt},
32};
33use std::{
34 cmp::{self, Ordering},
35 collections::HashMap,
36 convert::{TryFrom, TryInto},
37 ffi::{OsStr, OsString},
38 fmt,
39 future::Future,
40 io,
41 ops::Deref,
42 os::unix::fs::MetadataExt,
43 path::{Path, PathBuf},
44 pin::Pin,
45 sync::{
46 atomic::{self, AtomicUsize},
47 Arc,
48 },
49 time::{Duration, SystemTime},
50};
51use zed_rpc::{ForegroundRouter, PeerId, TypedEnvelope};
52
53lazy_static! {
54 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
55}
56
57pub fn init(cx: &mut MutableAppContext, rpc: &rpc::Client, router: &mut ForegroundRouter) {
58 rpc.on_message(router, remote::add_peer, cx);
59 rpc.on_message(router, remote::remove_peer, cx);
60 rpc.on_message(router, remote::update_worktree, cx);
61 rpc.on_message(router, remote::open_buffer, cx);
62 rpc.on_message(router, remote::close_buffer, cx);
63 rpc.on_message(router, remote::update_buffer, cx);
64 rpc.on_message(router, remote::buffer_saved, cx);
65 rpc.on_message(router, remote::save_buffer, cx);
66}
67
68#[async_trait::async_trait]
69pub trait Fs: Send + Sync {
70 async fn entry(
71 &self,
72 root_char_bag: CharBag,
73 next_entry_id: &AtomicUsize,
74 path: Arc<Path>,
75 abs_path: &Path,
76 ) -> Result<Option<Entry>>;
77 async fn child_entries<'a>(
78 &self,
79 root_char_bag: CharBag,
80 next_entry_id: &'a AtomicUsize,
81 path: &'a Path,
82 abs_path: &'a Path,
83 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>>;
84 async fn load(&self, path: &Path) -> Result<String>;
85 async fn save(&self, path: &Path, text: &Rope) -> Result<()>;
86 async fn canonicalize(&self, path: &Path) -> Result<PathBuf>;
87}
88
89struct ProductionFs;
90
91#[async_trait::async_trait]
92impl Fs for ProductionFs {
93 async fn entry(
94 &self,
95 root_char_bag: CharBag,
96 next_entry_id: &AtomicUsize,
97 path: Arc<Path>,
98 abs_path: &Path,
99 ) -> Result<Option<Entry>> {
100 let metadata = match smol::fs::metadata(&abs_path).await {
101 Err(err) => {
102 return match (err.kind(), err.raw_os_error()) {
103 (io::ErrorKind::NotFound, _) => Ok(None),
104 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
105 _ => Err(anyhow::Error::new(err)),
106 }
107 }
108 Ok(metadata) => metadata,
109 };
110 let inode = metadata.ino();
111 let mtime = metadata.modified()?;
112 let is_symlink = smol::fs::symlink_metadata(&abs_path)
113 .await
114 .context("failed to read symlink metadata")?
115 .file_type()
116 .is_symlink();
117
118 let entry = Entry {
119 id: next_entry_id.fetch_add(1, SeqCst),
120 kind: if metadata.file_type().is_dir() {
121 EntryKind::PendingDir
122 } else {
123 EntryKind::File(char_bag_for_path(root_char_bag, &path))
124 },
125 path: Arc::from(path),
126 inode,
127 mtime,
128 is_symlink,
129 is_ignored: false,
130 };
131
132 Ok(Some(entry))
133 }
134
135 async fn child_entries<'a>(
136 &self,
137 root_char_bag: CharBag,
138 next_entry_id: &'a AtomicUsize,
139 path: &'a Path,
140 abs_path: &'a Path,
141 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
142 let entries = smol::fs::read_dir(abs_path).await?;
143 Ok(entries
144 .then(move |entry| async move {
145 let child_entry = entry?;
146 let child_name = child_entry.file_name();
147 let child_path: Arc<Path> = path.join(&child_name).into();
148 let child_abs_path = abs_path.join(&child_name);
149 let child_is_symlink = child_entry.metadata().await?.file_type().is_symlink();
150 let child_metadata = smol::fs::metadata(child_abs_path).await?;
151 let child_inode = child_metadata.ino();
152 let child_mtime = child_metadata.modified()?;
153 Ok(Entry {
154 id: next_entry_id.fetch_add(1, SeqCst),
155 kind: if child_metadata.file_type().is_dir() {
156 EntryKind::PendingDir
157 } else {
158 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
159 },
160 path: child_path,
161 inode: child_inode,
162 mtime: child_mtime,
163 is_symlink: child_is_symlink,
164 is_ignored: false,
165 })
166 })
167 .boxed())
168 }
169
170 async fn load(&self, path: &Path) -> Result<String> {
171 let mut file = smol::fs::File::open(path).await?;
172 let mut text = String::new();
173 file.read_to_string(&mut text).await?;
174 Ok(text)
175 }
176
177 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
178 let buffer_size = text.summary().bytes.min(10 * 1024);
179 let file = smol::fs::File::create(path).await?;
180 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
181 for chunk in text.chunks() {
182 writer.write_all(chunk.as_bytes()).await?;
183 }
184 writer.flush().await?;
185 Ok(())
186 }
187
188 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
189 Ok(smol::fs::canonicalize(path).await?)
190 }
191}
192
193#[derive(Clone, Debug)]
194struct InMemoryEntry {
195 inode: u64,
196 mtime: SystemTime,
197 is_dir: bool,
198 is_symlink: bool,
199 content: Option<String>,
200}
201
202#[cfg(any(test, feature = "test-support"))]
203struct InMemoryFsState {
204 entries: std::collections::BTreeMap<PathBuf, InMemoryEntry>,
205 next_inode: u64,
206 events_tx: postage::broadcast::Sender<fsevent::Event>,
207}
208
209#[cfg(any(test, feature = "test-support"))]
210impl InMemoryFsState {
211 fn validate_path(&self, path: &Path) -> Result<()> {
212 if path.is_absolute()
213 && path
214 .parent()
215 .and_then(|path| self.entries.get(path))
216 .map_or(false, |e| e.is_dir)
217 {
218 Ok(())
219 } else {
220 Err(anyhow!("invalid path {:?}", path))
221 }
222 }
223
224 async fn emit_event(&mut self, path: &Path) {
225 let _ = self
226 .events_tx
227 .send(fsevent::Event {
228 event_id: 0,
229 flags: fsevent::StreamFlags::empty(),
230 path: path.to_path_buf(),
231 })
232 .await;
233 }
234}
235
236#[cfg(any(test, feature = "test-support"))]
237pub struct InMemoryFs {
238 state: smol::lock::RwLock<InMemoryFsState>,
239}
240
241#[cfg(any(test, feature = "test-support"))]
242impl InMemoryFs {
243 pub fn new() -> Self {
244 let (events_tx, _) = postage::broadcast::channel(2048);
245 let mut entries = std::collections::BTreeMap::new();
246 entries.insert(
247 Path::new("/").to_path_buf(),
248 InMemoryEntry {
249 inode: 0,
250 mtime: SystemTime::now(),
251 is_dir: true,
252 is_symlink: false,
253 content: None,
254 },
255 );
256 Self {
257 state: smol::lock::RwLock::new(InMemoryFsState {
258 entries,
259 next_inode: 1,
260 events_tx,
261 }),
262 }
263 }
264
265 pub async fn insert_dir(&self, path: &Path) -> Result<()> {
266 let mut state = self.state.write().await;
267 state.validate_path(path)?;
268
269 let inode = state.next_inode;
270 state.next_inode += 1;
271 state.entries.insert(
272 path.to_path_buf(),
273 InMemoryEntry {
274 inode,
275 mtime: SystemTime::now(),
276 is_dir: true,
277 is_symlink: false,
278 content: None,
279 },
280 );
281 state.emit_event(path).await;
282 Ok(())
283 }
284
285 pub async fn remove(&self, path: &Path) -> Result<()> {
286 let mut state = self.state.write().await;
287 state.validate_path(path)?;
288 state.entries.retain(|path, _| !path.starts_with(path));
289 state.emit_event(&path).await;
290 Ok(())
291 }
292
293 pub async fn rename(&self, source: &Path, target: &Path) -> Result<()> {
294 let mut state = self.state.write().await;
295 state.validate_path(source)?;
296 state.validate_path(target)?;
297 if state.entries.contains_key(target) {
298 Err(anyhow!("target path already exists"))
299 } else {
300 let mut removed = Vec::new();
301 state.entries.retain(|path, entry| {
302 if let Ok(relative_path) = path.strip_prefix(source) {
303 removed.push((relative_path.to_path_buf(), entry.clone()));
304 false
305 } else {
306 true
307 }
308 });
309
310 for (relative_path, entry) in removed {
311 let new_path = target.join(relative_path);
312 state.entries.insert(new_path, entry);
313 }
314
315 state.emit_event(source).await;
316 state.emit_event(target).await;
317
318 Ok(())
319 }
320 }
321
322 pub async fn events(&self) -> postage::broadcast::Receiver<fsevent::Event> {
323 self.state.read().await.events_tx.subscribe()
324 }
325}
326
327#[cfg(any(test, feature = "test-support"))]
328#[async_trait::async_trait]
329impl Fs for InMemoryFs {
330 async fn entry(
331 &self,
332 root_char_bag: CharBag,
333 next_entry_id: &AtomicUsize,
334 path: Arc<Path>,
335 abs_path: &Path,
336 ) -> Result<Option<Entry>> {
337 let state = self.state.read().await;
338 if let Some(entry) = state.entries.get(abs_path) {
339 Ok(Some(Entry {
340 id: next_entry_id.fetch_add(1, SeqCst),
341 kind: if entry.is_dir {
342 EntryKind::PendingDir
343 } else {
344 EntryKind::File(char_bag_for_path(root_char_bag, &path))
345 },
346 path: Arc::from(path),
347 inode: entry.inode,
348 mtime: entry.mtime,
349 is_symlink: entry.is_symlink,
350 is_ignored: false,
351 }))
352 } else {
353 Ok(None)
354 }
355 }
356
357 async fn child_entries<'a>(
358 &self,
359 root_char_bag: CharBag,
360 next_entry_id: &'a AtomicUsize,
361 path: &'a Path,
362 abs_path: &'a Path,
363 ) -> Result<Pin<Box<dyn 'a + Stream<Item = Result<Entry>> + Send>>> {
364 use futures::{future, stream};
365
366 let state = self.state.read().await;
367 Ok(stream::iter(state.entries.clone())
368 .filter(move |(child_path, _)| future::ready(child_path.parent() == Some(abs_path)))
369 .then(move |(child_abs_path, child_entry)| async move {
370 smol::future::yield_now().await;
371 let child_path = Arc::from(path.join(child_abs_path.file_name().unwrap()));
372 Ok(Entry {
373 id: next_entry_id.fetch_add(1, SeqCst),
374 kind: if child_entry.is_dir {
375 EntryKind::PendingDir
376 } else {
377 EntryKind::File(char_bag_for_path(root_char_bag, &child_path))
378 },
379 path: child_path,
380 inode: child_entry.inode,
381 mtime: child_entry.mtime,
382 is_symlink: child_entry.is_symlink,
383 is_ignored: false,
384 })
385 })
386 .boxed())
387 }
388
389 async fn load(&self, path: &Path) -> Result<String> {
390 let state = self.state.read().await;
391 let text = state
392 .entries
393 .get(path)
394 .and_then(|e| e.content.as_ref())
395 .ok_or_else(|| anyhow!("file {:?} does not exist", path))?;
396 Ok(text.clone())
397 }
398
399 async fn save(&self, path: &Path, text: &Rope) -> Result<()> {
400 let mut state = self.state.write().await;
401 state.validate_path(path)?;
402 if let Some(entry) = state.entries.get_mut(path) {
403 if entry.is_dir {
404 Err(anyhow!("cannot overwrite a directory with a file"))
405 } else {
406 entry.content = Some(text.chunks().collect());
407 entry.mtime = SystemTime::now();
408 state.emit_event(path).await;
409 Ok(())
410 }
411 } else {
412 let inode = state.next_inode;
413 state.next_inode += 1;
414 let entry = InMemoryEntry {
415 inode,
416 mtime: SystemTime::now(),
417 is_dir: false,
418 is_symlink: false,
419 content: Some(text.chunks().collect()),
420 };
421 state.entries.insert(path.to_path_buf(), entry);
422 state.emit_event(path).await;
423 Ok(())
424 }
425 }
426
427 async fn canonicalize(&self, path: &Path) -> Result<PathBuf> {
428 Ok(path.to_path_buf())
429 }
430}
431
432#[derive(Clone, Debug)]
433enum ScanState {
434 Idle,
435 Scanning,
436 Err(Arc<anyhow::Error>),
437}
438
439pub enum Worktree {
440 Local(LocalWorktree),
441 Remote(RemoteWorktree),
442}
443
444impl Entity for Worktree {
445 type Event = ();
446
447 fn release(&mut self, cx: &mut MutableAppContext) {
448 let rpc = match self {
449 Self::Local(tree) => tree.rpc.clone(),
450 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
451 };
452
453 if let Some((rpc, worktree_id)) = rpc {
454 cx.spawn(|_| async move {
455 rpc.state
456 .write()
457 .await
458 .shared_worktrees
459 .remove(&worktree_id);
460 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
461 log::error!("error closing worktree {}: {}", worktree_id, err);
462 }
463 })
464 .detach();
465 }
466 }
467}
468
469impl Worktree {
470 pub fn local(
471 path: impl Into<Arc<Path>>,
472 languages: Arc<LanguageRegistry>,
473 cx: &mut ModelContext<Worktree>,
474 ) -> Self {
475 let fs = Arc::new(ProductionFs);
476 let (mut tree, scan_states_tx) =
477 LocalWorktree::new(path, languages, fs.clone(), Duration::from_millis(100), cx);
478 let (event_stream, event_stream_handle) = fsevent::EventStream::new(
479 &[tree.snapshot.abs_path.as_ref()],
480 Duration::from_millis(100),
481 );
482 let background_snapshot = tree.background_snapshot.clone();
483 std::thread::spawn(move || {
484 let scanner = BackgroundScanner::new(
485 background_snapshot,
486 scan_states_tx,
487 fs,
488 Arc::new(executor::Background::new()),
489 );
490 scanner.run(event_stream);
491 });
492 tree._event_stream_handle = Some(event_stream_handle);
493 Worktree::Local(tree)
494 }
495
496 #[cfg(any(test, feature = "test-support"))]
497 pub fn test(
498 path: impl Into<Arc<Path>>,
499 languages: Arc<LanguageRegistry>,
500 fs: Arc<InMemoryFs>,
501 cx: &mut ModelContext<Worktree>,
502 ) -> Self {
503 let (tree, scan_states_tx) =
504 LocalWorktree::new(path, languages, fs.clone(), Duration::ZERO, cx);
505 let background_snapshot = tree.background_snapshot.clone();
506 let fs = fs.clone();
507 let background = cx.background().clone();
508 cx.background()
509 .spawn(async move {
510 let events_rx = fs.events().await;
511 let scanner =
512 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
513 scanner.run_test(events_rx).await;
514 })
515 .detach();
516 Worktree::Local(tree)
517 }
518
519 pub async fn open_remote(
520 rpc: rpc::Client,
521 id: u64,
522 access_token: String,
523 languages: Arc<LanguageRegistry>,
524 cx: &mut AsyncAppContext,
525 ) -> Result<ModelHandle<Self>> {
526 let response = rpc
527 .request(proto::OpenWorktree {
528 worktree_id: id,
529 access_token,
530 })
531 .await?;
532
533 Worktree::remote(response, rpc, languages, cx).await
534 }
535
536 async fn remote(
537 open_response: proto::OpenWorktreeResponse,
538 rpc: rpc::Client,
539 languages: Arc<LanguageRegistry>,
540 cx: &mut AsyncAppContext,
541 ) -> Result<ModelHandle<Self>> {
542 let worktree = open_response
543 .worktree
544 .ok_or_else(|| anyhow!("empty worktree"))?;
545
546 let remote_id = open_response.worktree_id;
547 let replica_id = open_response.replica_id as ReplicaId;
548 let peers = open_response.peers;
549 let root_char_bag: CharBag = worktree
550 .root_name
551 .chars()
552 .map(|c| c.to_ascii_lowercase())
553 .collect();
554 let root_name = worktree.root_name.clone();
555 let (entries_by_path, entries_by_id) = cx
556 .background()
557 .spawn(async move {
558 let mut entries_by_path_edits = Vec::new();
559 let mut entries_by_id_edits = Vec::new();
560 for entry in worktree.entries {
561 match Entry::try_from((&root_char_bag, entry)) {
562 Ok(entry) => {
563 entries_by_id_edits.push(Edit::Insert(PathEntry {
564 id: entry.id,
565 path: entry.path.clone(),
566 scan_id: 0,
567 }));
568 entries_by_path_edits.push(Edit::Insert(entry));
569 }
570 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
571 }
572 }
573
574 let mut entries_by_path = SumTree::new();
575 let mut entries_by_id = SumTree::new();
576 entries_by_path.edit(entries_by_path_edits, &());
577 entries_by_id.edit(entries_by_id_edits, &());
578 (entries_by_path, entries_by_id)
579 })
580 .await;
581
582 let worktree = cx.update(|cx| {
583 cx.add_model(|cx: &mut ModelContext<Worktree>| {
584 let snapshot = Snapshot {
585 id: cx.model_id(),
586 scan_id: 0,
587 abs_path: Path::new("").into(),
588 root_name,
589 root_char_bag,
590 ignores: Default::default(),
591 entries_by_path,
592 entries_by_id,
593 removed_entry_ids: Default::default(),
594 next_entry_id: Default::default(),
595 };
596
597 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
598 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
599
600 cx.background()
601 .spawn(async move {
602 while let Some(update) = updates_rx.recv().await {
603 let mut snapshot = snapshot_tx.borrow().clone();
604 if let Err(error) = snapshot.apply_update(update) {
605 log::error!("error applying worktree update: {}", error);
606 }
607 *snapshot_tx.borrow_mut() = snapshot;
608 }
609 })
610 .detach();
611
612 {
613 let mut snapshot_rx = snapshot_rx.clone();
614 cx.spawn_weak(|this, mut cx| async move {
615 while let Some(_) = snapshot_rx.recv().await {
616 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
617 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
618 } else {
619 break;
620 }
621 }
622 })
623 .detach();
624 }
625
626 Worktree::Remote(RemoteWorktree {
627 remote_id,
628 replica_id,
629 snapshot,
630 snapshot_rx,
631 updates_tx,
632 rpc: rpc.clone(),
633 open_buffers: Default::default(),
634 peers: peers
635 .into_iter()
636 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
637 .collect(),
638 languages,
639 })
640 })
641 });
642 rpc.state
643 .write()
644 .await
645 .shared_worktrees
646 .insert(open_response.worktree_id, worktree.downgrade());
647
648 Ok(worktree)
649 }
650
651 pub fn as_local(&self) -> Option<&LocalWorktree> {
652 if let Worktree::Local(worktree) = self {
653 Some(worktree)
654 } else {
655 None
656 }
657 }
658
659 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
660 if let Worktree::Local(worktree) = self {
661 Some(worktree)
662 } else {
663 None
664 }
665 }
666
667 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
668 if let Worktree::Remote(worktree) = self {
669 Some(worktree)
670 } else {
671 None
672 }
673 }
674
675 pub fn snapshot(&self) -> Snapshot {
676 match self {
677 Worktree::Local(worktree) => worktree.snapshot(),
678 Worktree::Remote(worktree) => worktree.snapshot(),
679 }
680 }
681
682 pub fn replica_id(&self) -> ReplicaId {
683 match self {
684 Worktree::Local(_) => 0,
685 Worktree::Remote(worktree) => worktree.replica_id,
686 }
687 }
688
689 pub fn add_peer(
690 &mut self,
691 envelope: TypedEnvelope<proto::AddPeer>,
692 cx: &mut ModelContext<Worktree>,
693 ) -> Result<()> {
694 match self {
695 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
696 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
697 }
698 }
699
700 pub fn remove_peer(
701 &mut self,
702 envelope: TypedEnvelope<proto::RemovePeer>,
703 cx: &mut ModelContext<Worktree>,
704 ) -> Result<()> {
705 match self {
706 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
707 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
708 }
709 }
710
711 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
712 match self {
713 Worktree::Local(worktree) => &worktree.peers,
714 Worktree::Remote(worktree) => &worktree.peers,
715 }
716 }
717
718 pub fn open_buffer(
719 &mut self,
720 path: impl AsRef<Path>,
721 cx: &mut ModelContext<Self>,
722 ) -> Task<Result<ModelHandle<Buffer>>> {
723 match self {
724 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
725 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
726 }
727 }
728
729 #[cfg(feature = "test-support")]
730 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
731 let open_buffers = match self {
732 Worktree::Local(worktree) => &worktree.open_buffers,
733 Worktree::Remote(worktree) => &worktree.open_buffers,
734 };
735
736 let path = path.as_ref();
737 open_buffers
738 .values()
739 .find(|buffer| {
740 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
741 file.path.as_ref() == path
742 } else {
743 false
744 }
745 })
746 .is_some()
747 }
748
749 pub fn update_buffer(
750 &mut self,
751 envelope: proto::UpdateBuffer,
752 cx: &mut ModelContext<Self>,
753 ) -> Result<()> {
754 let open_buffers = match self {
755 Worktree::Local(worktree) => &worktree.open_buffers,
756 Worktree::Remote(worktree) => &worktree.open_buffers,
757 };
758 let buffer = open_buffers
759 .get(&(envelope.buffer_id as usize))
760 .and_then(|buf| buf.upgrade(&cx));
761
762 let buffer = if let Some(buffer) = buffer {
763 buffer
764 } else {
765 return if matches!(self, Worktree::Local(_)) {
766 Err(anyhow!(
767 "invalid buffer {} in update buffer message",
768 envelope.buffer_id
769 ))
770 } else {
771 Ok(())
772 };
773 };
774
775 let ops = envelope
776 .operations
777 .into_iter()
778 .map(|op| op.try_into())
779 .collect::<anyhow::Result<Vec<_>>>()?;
780 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
781 Ok(())
782 }
783
784 pub fn buffer_saved(
785 &mut self,
786 message: proto::BufferSaved,
787 cx: &mut ModelContext<Self>,
788 ) -> Result<()> {
789 if let Worktree::Remote(worktree) = self {
790 if let Some(buffer) = worktree
791 .open_buffers
792 .get(&(message.buffer_id as usize))
793 .and_then(|buf| buf.upgrade(&cx))
794 {
795 buffer.update(cx, |buffer, cx| {
796 let version = message.version.try_into()?;
797 let mtime = message
798 .mtime
799 .ok_or_else(|| anyhow!("missing mtime"))?
800 .into();
801 buffer.did_save(version, mtime, cx);
802 Result::<_, anyhow::Error>::Ok(())
803 })?;
804 }
805 Ok(())
806 } else {
807 Err(anyhow!(
808 "invalid buffer {} in buffer saved message",
809 message.buffer_id
810 ))
811 }
812 }
813
814 fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
815 let update_buffers = match self {
816 Self::Local(worktree) => {
817 let poll_interval = worktree.poll_interval;
818 worktree.snapshot = worktree.background_snapshot.lock().clone();
819 if worktree.is_scanning() {
820 if !worktree.poll_scheduled {
821 cx.spawn(|this, mut cx| async move {
822 if poll_interval.is_zero() {
823 smol::future::yield_now().await;
824 } else {
825 smol::Timer::after(poll_interval).await;
826 }
827 this.update(&mut cx, |this, cx| {
828 this.as_local_mut().unwrap().poll_scheduled = false;
829 this.poll_snapshot(cx);
830 })
831 })
832 .detach();
833 worktree.poll_scheduled = true;
834 }
835 false
836 } else {
837 true
838 }
839 }
840 Self::Remote(worktree) => {
841 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
842 true
843 }
844 };
845
846 if update_buffers {
847 let mut buffers_to_delete = Vec::new();
848 for (buffer_id, buffer) in self.open_buffers() {
849 if let Some(buffer) = buffer.upgrade(&cx) {
850 buffer.update(cx, |buffer, cx| {
851 let buffer_is_clean = !buffer.is_dirty();
852
853 if let Some(file) = buffer.file_mut() {
854 let mut file_changed = false;
855
856 if let Some(entry) = file
857 .entry_id
858 .and_then(|entry_id| self.entry_for_id(entry_id))
859 {
860 if entry.path != file.path {
861 file.path = entry.path.clone();
862 file_changed = true;
863 }
864
865 if entry.mtime != file.mtime {
866 file.mtime = entry.mtime;
867 file_changed = true;
868 if let Some(worktree) = self.as_local() {
869 if buffer_is_clean {
870 let abs_path = worktree.absolutize(&file.path);
871 refresh_buffer(abs_path, &worktree.fs, cx);
872 }
873 }
874 }
875 } else if let Some(entry) = self.entry_for_path(&file.path) {
876 file.entry_id = Some(entry.id);
877 file.mtime = entry.mtime;
878 if let Some(worktree) = self.as_local() {
879 if buffer_is_clean {
880 let abs_path = worktree.absolutize(&file.path);
881 refresh_buffer(abs_path, &worktree.fs, cx);
882 }
883 }
884 file_changed = true;
885 } else if !file.is_deleted() {
886 if buffer_is_clean {
887 cx.emit(editor::buffer::Event::Dirtied);
888 }
889 file.entry_id = None;
890 file_changed = true;
891 }
892
893 if file_changed {
894 cx.emit(editor::buffer::Event::FileHandleChanged);
895 }
896 }
897 });
898 } else {
899 buffers_to_delete.push(*buffer_id);
900 }
901 }
902
903 for buffer_id in buffers_to_delete {
904 self.open_buffers_mut().remove(&buffer_id);
905 }
906 }
907
908 cx.notify();
909 }
910
911 fn open_buffers(&self) -> &HashMap<usize, WeakModelHandle<Buffer>> {
912 match self {
913 Self::Local(worktree) => &worktree.open_buffers,
914 Self::Remote(worktree) => &worktree.open_buffers,
915 }
916 }
917
918 fn open_buffers_mut(&mut self) -> &mut HashMap<usize, WeakModelHandle<Buffer>> {
919 match self {
920 Self::Local(worktree) => &mut worktree.open_buffers,
921 Self::Remote(worktree) => &mut worktree.open_buffers,
922 }
923 }
924}
925
926impl Deref for Worktree {
927 type Target = Snapshot;
928
929 fn deref(&self) -> &Self::Target {
930 match self {
931 Worktree::Local(worktree) => &worktree.snapshot,
932 Worktree::Remote(worktree) => &worktree.snapshot,
933 }
934 }
935}
936
937pub struct LocalWorktree {
938 snapshot: Snapshot,
939 background_snapshot: Arc<Mutex<Snapshot>>,
940 snapshots_to_send_tx: Option<Sender<Snapshot>>,
941 last_scan_state_rx: watch::Receiver<ScanState>,
942 _event_stream_handle: Option<fsevent::Handle>,
943 poll_scheduled: bool,
944 rpc: Option<(rpc::Client, u64)>,
945 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
946 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
947 peers: HashMap<PeerId, ReplicaId>,
948 languages: Arc<LanguageRegistry>,
949 fs: Arc<dyn Fs>,
950 poll_interval: Duration,
951}
952
953impl LocalWorktree {
954 fn new(
955 path: impl Into<Arc<Path>>,
956 languages: Arc<LanguageRegistry>,
957 fs: Arc<dyn Fs>,
958 poll_interval: Duration,
959 cx: &mut ModelContext<Worktree>,
960 ) -> (Self, Sender<ScanState>) {
961 let abs_path = path.into();
962 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
963 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
964 let id = cx.model_id();
965 let snapshot = Snapshot {
966 id,
967 scan_id: 0,
968 abs_path,
969 root_name: Default::default(),
970 root_char_bag: Default::default(),
971 ignores: Default::default(),
972 entries_by_path: Default::default(),
973 entries_by_id: Default::default(),
974 removed_entry_ids: Default::default(),
975 next_entry_id: Default::default(),
976 };
977
978 let tree = Self {
979 snapshot: snapshot.clone(),
980 background_snapshot: Arc::new(Mutex::new(snapshot)),
981 snapshots_to_send_tx: None,
982 last_scan_state_rx,
983 _event_stream_handle: None,
984 poll_scheduled: false,
985 open_buffers: Default::default(),
986 shared_buffers: Default::default(),
987 peers: Default::default(),
988 rpc: None,
989 languages,
990 fs,
991 poll_interval,
992 };
993
994 cx.spawn_weak(|this, mut cx| async move {
995 while let Ok(scan_state) = scan_states_rx.recv().await {
996 if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
997 handle.update(&mut cx, |this, cx| {
998 last_scan_state_tx.blocking_send(scan_state).ok();
999 this.poll_snapshot(cx);
1000 let tree = this.as_local_mut().unwrap();
1001 if !tree.is_scanning() {
1002 if let Some(snapshots_to_send_tx) = tree.snapshots_to_send_tx.clone() {
1003 if let Err(err) =
1004 smol::block_on(snapshots_to_send_tx.send(tree.snapshot()))
1005 {
1006 log::error!("error submitting snapshot to send {}", err);
1007 }
1008 }
1009 }
1010 });
1011 } else {
1012 break;
1013 }
1014 }
1015 })
1016 .detach();
1017
1018 (tree, scan_states_tx)
1019 }
1020
1021 pub fn open_buffer(
1022 &mut self,
1023 path: &Path,
1024 cx: &mut ModelContext<Worktree>,
1025 ) -> Task<Result<ModelHandle<Buffer>>> {
1026 let handle = cx.handle();
1027
1028 // If there is already a buffer for the given path, then return it.
1029 let mut existing_buffer = None;
1030 self.open_buffers.retain(|_buffer_id, buffer| {
1031 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1032 if let Some(file) = buffer.read(cx.as_ref()).file() {
1033 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1034 existing_buffer = Some(buffer);
1035 }
1036 }
1037 true
1038 } else {
1039 false
1040 }
1041 });
1042
1043 let languages = self.languages.clone();
1044 let path = Arc::from(path);
1045 cx.spawn(|this, mut cx| async move {
1046 if let Some(existing_buffer) = existing_buffer {
1047 Ok(existing_buffer)
1048 } else {
1049 let (file, contents) = this
1050 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
1051 .await?;
1052 let language = languages.select_language(&path).cloned();
1053 let buffer = cx.add_model(|cx| {
1054 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
1055 });
1056 this.update(&mut cx, |this, _| {
1057 let this = this
1058 .as_local_mut()
1059 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1060 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1061 Ok(buffer)
1062 })
1063 }
1064 })
1065 }
1066
1067 pub fn open_remote_buffer(
1068 &mut self,
1069 envelope: TypedEnvelope<proto::OpenBuffer>,
1070 cx: &mut ModelContext<Worktree>,
1071 ) -> Task<Result<proto::OpenBufferResponse>> {
1072 let peer_id = envelope.original_sender_id();
1073 let path = Path::new(&envelope.payload.path);
1074
1075 let buffer = self.open_buffer(path, cx);
1076
1077 cx.spawn(|this, mut cx| async move {
1078 let buffer = buffer.await?;
1079 this.update(&mut cx, |this, cx| {
1080 this.as_local_mut()
1081 .unwrap()
1082 .shared_buffers
1083 .entry(peer_id?)
1084 .or_default()
1085 .insert(buffer.id() as u64, buffer.clone());
1086
1087 Ok(proto::OpenBufferResponse {
1088 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
1089 })
1090 })
1091 })
1092 }
1093
1094 pub fn close_remote_buffer(
1095 &mut self,
1096 envelope: TypedEnvelope<proto::CloseBuffer>,
1097 _: &mut ModelContext<Worktree>,
1098 ) -> Result<()> {
1099 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1100 shared_buffers.remove(&envelope.payload.buffer_id);
1101 }
1102
1103 Ok(())
1104 }
1105
1106 pub fn add_peer(
1107 &mut self,
1108 envelope: TypedEnvelope<proto::AddPeer>,
1109 cx: &mut ModelContext<Worktree>,
1110 ) -> Result<()> {
1111 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
1112 self.peers
1113 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1114 cx.notify();
1115 Ok(())
1116 }
1117
1118 pub fn remove_peer(
1119 &mut self,
1120 envelope: TypedEnvelope<proto::RemovePeer>,
1121 cx: &mut ModelContext<Worktree>,
1122 ) -> Result<()> {
1123 let peer_id = PeerId(envelope.payload.peer_id);
1124 let replica_id = self
1125 .peers
1126 .remove(&peer_id)
1127 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1128 self.shared_buffers.remove(&peer_id);
1129 for (_, buffer) in &self.open_buffers {
1130 if let Some(buffer) = buffer.upgrade(&cx) {
1131 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1132 }
1133 }
1134 cx.notify();
1135 Ok(())
1136 }
1137
1138 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1139 let mut scan_state_rx = self.last_scan_state_rx.clone();
1140 async move {
1141 let mut scan_state = Some(scan_state_rx.borrow().clone());
1142 while let Some(ScanState::Scanning) = scan_state {
1143 scan_state = scan_state_rx.recv().await;
1144 }
1145 }
1146 }
1147
1148 fn is_scanning(&self) -> bool {
1149 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1150 true
1151 } else {
1152 false
1153 }
1154 }
1155
1156 pub fn snapshot(&self) -> Snapshot {
1157 self.snapshot.clone()
1158 }
1159
1160 pub fn abs_path(&self) -> &Path {
1161 self.snapshot.abs_path.as_ref()
1162 }
1163
1164 pub fn contains_abs_path(&self, path: &Path) -> bool {
1165 path.starts_with(&self.snapshot.abs_path)
1166 }
1167
1168 fn absolutize(&self, path: &Path) -> PathBuf {
1169 if path.file_name().is_some() {
1170 self.snapshot.abs_path.join(path)
1171 } else {
1172 self.snapshot.abs_path.to_path_buf()
1173 }
1174 }
1175
1176 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1177 let handle = cx.handle();
1178 let path = Arc::from(path);
1179 let abs_path = self.absolutize(&path);
1180 let background_snapshot = self.background_snapshot.clone();
1181 let fs = self.fs.clone();
1182 cx.spawn(|this, mut cx| async move {
1183 let text = fs.load(&abs_path).await?;
1184 // Eagerly populate the snapshot with an updated entry for the loaded file
1185 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1186 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1187 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
1188 })
1189 }
1190
1191 pub fn save_buffer_as(
1192 &self,
1193 buffer: ModelHandle<Buffer>,
1194 path: impl Into<Arc<Path>>,
1195 text: Rope,
1196 cx: &mut ModelContext<Worktree>,
1197 ) -> Task<Result<File>> {
1198 let save = self.save(path, text, cx);
1199 cx.spawn(|this, mut cx| async move {
1200 let entry = save.await?;
1201 this.update(&mut cx, |this, cx| {
1202 this.as_local_mut()
1203 .unwrap()
1204 .open_buffers
1205 .insert(buffer.id(), buffer.downgrade());
1206 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1207 })
1208 })
1209 }
1210
1211 fn save(
1212 &self,
1213 path: impl Into<Arc<Path>>,
1214 text: Rope,
1215 cx: &mut ModelContext<Worktree>,
1216 ) -> Task<Result<Entry>> {
1217 let path = path.into();
1218 let abs_path = self.absolutize(&path);
1219 let background_snapshot = self.background_snapshot.clone();
1220 let fs = self.fs.clone();
1221 let save = cx.background().spawn(async move {
1222 fs.save(&abs_path, &text).await?;
1223 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1224 });
1225
1226 cx.spawn(|this, mut cx| async move {
1227 let entry = save.await?;
1228 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1229 Ok(entry)
1230 })
1231 }
1232
1233 pub fn share(
1234 &mut self,
1235 rpc: rpc::Client,
1236 cx: &mut ModelContext<Worktree>,
1237 ) -> Task<anyhow::Result<(u64, String)>> {
1238 let snapshot = self.snapshot();
1239 let share_request = self.share_request(cx);
1240 let handle = cx.handle();
1241 cx.spawn(|this, mut cx| async move {
1242 let share_request = share_request.await;
1243 let share_response = rpc.request(share_request).await?;
1244
1245 rpc.state
1246 .write()
1247 .await
1248 .shared_worktrees
1249 .insert(share_response.worktree_id, handle.downgrade());
1250
1251 log::info!("sharing worktree {:?}", share_response);
1252 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1253 smol::channel::unbounded::<Snapshot>();
1254
1255 cx.background()
1256 .spawn({
1257 let rpc = rpc.clone();
1258 let worktree_id = share_response.worktree_id;
1259 async move {
1260 let mut prev_snapshot = snapshot;
1261 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1262 let message = snapshot.build_update(&prev_snapshot, worktree_id);
1263 match rpc.send(message).await {
1264 Ok(()) => prev_snapshot = snapshot,
1265 Err(err) => log::error!("error sending snapshot diff {}", err),
1266 }
1267 }
1268 }
1269 })
1270 .detach();
1271
1272 this.update(&mut cx, |worktree, _| {
1273 let worktree = worktree.as_local_mut().unwrap();
1274 worktree.rpc = Some((rpc, share_response.worktree_id));
1275 worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
1276 });
1277
1278 Ok((share_response.worktree_id, share_response.access_token))
1279 })
1280 }
1281
1282 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
1283 let snapshot = self.snapshot();
1284 let root_name = self.root_name.clone();
1285 cx.background().spawn(async move {
1286 let entries = snapshot
1287 .entries_by_path
1288 .cursor::<(), ()>()
1289 .map(Into::into)
1290 .collect();
1291 proto::ShareWorktree {
1292 worktree: Some(proto::Worktree { root_name, entries }),
1293 }
1294 })
1295 }
1296}
1297
1298pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1299 let fs = fs.clone();
1300 cx.spawn(|buffer, mut cx| async move {
1301 let new_text = fs.load(&abs_path).await;
1302 match new_text {
1303 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1304 Ok(new_text) => {
1305 buffer
1306 .update(&mut cx, |buffer, cx| {
1307 buffer.set_text_from_disk(new_text.into(), cx)
1308 })
1309 .await;
1310 }
1311 }
1312 })
1313 .detach()
1314}
1315
1316impl Deref for LocalWorktree {
1317 type Target = Snapshot;
1318
1319 fn deref(&self) -> &Self::Target {
1320 &self.snapshot
1321 }
1322}
1323
1324impl fmt::Debug for LocalWorktree {
1325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1326 self.snapshot.fmt(f)
1327 }
1328}
1329
1330pub struct RemoteWorktree {
1331 remote_id: u64,
1332 snapshot: Snapshot,
1333 snapshot_rx: watch::Receiver<Snapshot>,
1334 rpc: rpc::Client,
1335 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1336 replica_id: ReplicaId,
1337 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
1338 peers: HashMap<PeerId, ReplicaId>,
1339 languages: Arc<LanguageRegistry>,
1340}
1341
1342impl RemoteWorktree {
1343 pub fn open_buffer(
1344 &mut self,
1345 path: &Path,
1346 cx: &mut ModelContext<Worktree>,
1347 ) -> Task<Result<ModelHandle<Buffer>>> {
1348 let handle = cx.handle();
1349 let mut existing_buffer = None;
1350 self.open_buffers.retain(|_buffer_id, buffer| {
1351 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1352 if let Some(file) = buffer.read(cx.as_ref()).file() {
1353 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1354 existing_buffer = Some(buffer);
1355 }
1356 }
1357 true
1358 } else {
1359 false
1360 }
1361 });
1362
1363 let rpc = self.rpc.clone();
1364 let languages = self.languages.clone();
1365 let replica_id = self.replica_id;
1366 let remote_worktree_id = self.remote_id;
1367 let path = path.to_string_lossy().to_string();
1368 cx.spawn(|this, mut cx| async move {
1369 if let Some(existing_buffer) = existing_buffer {
1370 Ok(existing_buffer)
1371 } else {
1372 let entry = this
1373 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1374 .ok_or_else(|| anyhow!("file does not exist"))?;
1375 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1376 let language = languages.select_language(&path).cloned();
1377 let response = rpc
1378 .request(proto::OpenBuffer {
1379 worktree_id: remote_worktree_id as u64,
1380 path,
1381 })
1382 .await?;
1383 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1384 let buffer_id = remote_buffer.id;
1385 let buffer = cx.add_model(|cx| {
1386 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1387 });
1388 this.update(&mut cx, |this, _| {
1389 let this = this.as_remote_mut().unwrap();
1390 this.open_buffers
1391 .insert(buffer_id as usize, buffer.downgrade());
1392 });
1393 Ok(buffer)
1394 }
1395 })
1396 }
1397
1398 fn snapshot(&self) -> Snapshot {
1399 self.snapshot.clone()
1400 }
1401
1402 pub fn add_peer(
1403 &mut self,
1404 envelope: TypedEnvelope<proto::AddPeer>,
1405 cx: &mut ModelContext<Worktree>,
1406 ) -> Result<()> {
1407 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
1408 self.peers
1409 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1410 cx.notify();
1411 Ok(())
1412 }
1413
1414 pub fn remove_peer(
1415 &mut self,
1416 envelope: TypedEnvelope<proto::RemovePeer>,
1417 cx: &mut ModelContext<Worktree>,
1418 ) -> Result<()> {
1419 let peer_id = PeerId(envelope.payload.peer_id);
1420 let replica_id = self
1421 .peers
1422 .remove(&peer_id)
1423 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1424 for (_, buffer) in &self.open_buffers {
1425 if let Some(buffer) = buffer.upgrade(&cx) {
1426 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1427 }
1428 }
1429 cx.notify();
1430 Ok(())
1431 }
1432}
1433
1434#[derive(Clone)]
1435pub struct Snapshot {
1436 id: usize,
1437 scan_id: usize,
1438 abs_path: Arc<Path>,
1439 root_name: String,
1440 root_char_bag: CharBag,
1441 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1442 entries_by_path: SumTree<Entry>,
1443 entries_by_id: SumTree<PathEntry>,
1444 removed_entry_ids: HashMap<u64, usize>,
1445 next_entry_id: Arc<AtomicUsize>,
1446}
1447
1448impl Snapshot {
1449 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1450 let mut updated_entries = Vec::new();
1451 let mut removed_entries = Vec::new();
1452 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1453 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1454 loop {
1455 match (self_entries.peek(), other_entries.peek()) {
1456 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1457 Ordering::Less => {
1458 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1459 updated_entries.push(entry);
1460 self_entries.next();
1461 }
1462 Ordering::Equal => {
1463 if self_entry.scan_id != other_entry.scan_id {
1464 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1465 updated_entries.push(entry);
1466 }
1467
1468 self_entries.next();
1469 other_entries.next();
1470 }
1471 Ordering::Greater => {
1472 removed_entries.push(other_entry.id as u64);
1473 other_entries.next();
1474 }
1475 },
1476 (Some(self_entry), None) => {
1477 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1478 updated_entries.push(entry);
1479 self_entries.next();
1480 }
1481 (None, Some(other_entry)) => {
1482 removed_entries.push(other_entry.id as u64);
1483 other_entries.next();
1484 }
1485 (None, None) => break,
1486 }
1487 }
1488
1489 proto::UpdateWorktree {
1490 updated_entries,
1491 removed_entries,
1492 worktree_id,
1493 }
1494 }
1495
1496 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1497 self.scan_id += 1;
1498 let scan_id = self.scan_id;
1499
1500 let mut entries_by_path_edits = Vec::new();
1501 let mut entries_by_id_edits = Vec::new();
1502 for entry_id in update.removed_entries {
1503 let entry_id = entry_id as usize;
1504 let entry = self
1505 .entry_for_id(entry_id)
1506 .ok_or_else(|| anyhow!("unknown entry"))?;
1507 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1508 entries_by_id_edits.push(Edit::Remove(entry.id));
1509 }
1510
1511 for entry in update.updated_entries {
1512 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1513 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1514 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1515 }
1516 entries_by_id_edits.push(Edit::Insert(PathEntry {
1517 id: entry.id,
1518 path: entry.path.clone(),
1519 scan_id,
1520 }));
1521 entries_by_path_edits.push(Edit::Insert(entry));
1522 }
1523
1524 self.entries_by_path.edit(entries_by_path_edits, &());
1525 self.entries_by_id.edit(entries_by_id_edits, &());
1526
1527 Ok(())
1528 }
1529
1530 pub fn file_count(&self) -> usize {
1531 self.entries_by_path.summary().file_count
1532 }
1533
1534 pub fn visible_file_count(&self) -> usize {
1535 self.entries_by_path.summary().visible_file_count
1536 }
1537
1538 pub fn files(&self, start: usize) -> FileIter {
1539 FileIter::all(self, start)
1540 }
1541
1542 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1543 let empty_path = Path::new("");
1544 self.entries_by_path
1545 .cursor::<(), ()>()
1546 .filter(move |entry| entry.path.as_ref() != empty_path)
1547 .map(|entry| entry.path())
1548 }
1549
1550 pub fn visible_files(&self, start: usize) -> FileIter {
1551 FileIter::visible(self, start)
1552 }
1553
1554 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1555 ChildEntriesIter::new(path, self)
1556 }
1557
1558 pub fn root_entry(&self) -> &Entry {
1559 self.entry_for_path("").unwrap()
1560 }
1561
1562 /// Returns the filename of the snapshot's root, plus a trailing slash if the snapshot's root is
1563 /// a directory.
1564 pub fn root_name(&self) -> &str {
1565 &self.root_name
1566 }
1567
1568 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1569 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1570 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1571 cursor.item()
1572 } else {
1573 None
1574 }
1575 }
1576
1577 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1578 let entry = self.entries_by_id.get(&id, &())?;
1579 self.entry_for_path(&entry.path)
1580 }
1581
1582 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1583 self.entry_for_path(path.as_ref()).map(|e| e.inode())
1584 }
1585
1586 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1587 if !entry.is_dir() && entry.path().file_name() == Some(&GITIGNORE) {
1588 let (ignore, err) = Gitignore::new(self.abs_path.join(entry.path()));
1589 if let Some(err) = err {
1590 log::error!("error in ignore file {:?} - {:?}", entry.path(), err);
1591 }
1592
1593 let ignore_dir_path = entry.path().parent().unwrap();
1594 self.ignores
1595 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1596 }
1597
1598 self.reuse_entry_id(&mut entry);
1599 self.entries_by_path.insert_or_replace(entry.clone(), &());
1600 self.entries_by_id.insert_or_replace(
1601 PathEntry {
1602 id: entry.id,
1603 path: entry.path.clone(),
1604 scan_id: self.scan_id,
1605 },
1606 &(),
1607 );
1608 entry
1609 }
1610
1611 fn populate_dir(
1612 &mut self,
1613 parent_path: Arc<Path>,
1614 entries: impl IntoIterator<Item = Entry>,
1615 ignore: Option<Arc<Gitignore>>,
1616 ) {
1617 let mut parent_entry = self
1618 .entries_by_path
1619 .get(&PathKey(parent_path.clone()), &())
1620 .unwrap()
1621 .clone();
1622 if let Some(ignore) = ignore {
1623 self.ignores.insert(parent_path, (ignore, self.scan_id));
1624 }
1625 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1626 parent_entry.kind = EntryKind::Dir;
1627 } else {
1628 unreachable!();
1629 }
1630
1631 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1632 let mut entries_by_id_edits = Vec::new();
1633
1634 for mut entry in entries {
1635 self.reuse_entry_id(&mut entry);
1636 entries_by_id_edits.push(Edit::Insert(PathEntry {
1637 id: entry.id,
1638 path: entry.path.clone(),
1639 scan_id: self.scan_id,
1640 }));
1641 entries_by_path_edits.push(Edit::Insert(entry));
1642 }
1643
1644 self.entries_by_path.edit(entries_by_path_edits, &());
1645 self.entries_by_id.edit(entries_by_id_edits, &());
1646 }
1647
1648 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1649 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1650 entry.id = removed_entry_id;
1651 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1652 entry.id = existing_entry.id;
1653 }
1654 }
1655
1656 fn remove_path(&mut self, path: &Path) {
1657 let mut new_entries;
1658 let removed_entry_ids;
1659 {
1660 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1661 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1662 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1663 new_entries.push_tree(cursor.suffix(&()), &());
1664 }
1665 self.entries_by_path = new_entries;
1666
1667 let mut entries_by_id_edits = Vec::new();
1668 for entry in removed_entry_ids.cursor::<(), ()>() {
1669 let removed_entry_id = self
1670 .removed_entry_ids
1671 .entry(entry.inode)
1672 .or_insert(entry.id);
1673 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1674 entries_by_id_edits.push(Edit::Remove(entry.id));
1675 }
1676 self.entries_by_id.edit(entries_by_id_edits, &());
1677
1678 if path.file_name() == Some(&GITIGNORE) {
1679 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1680 *scan_id = self.scan_id;
1681 }
1682 }
1683 }
1684
1685 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1686 let mut new_ignores = Vec::new();
1687 for ancestor in path.ancestors().skip(1) {
1688 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1689 new_ignores.push((ancestor, Some(ignore.clone())));
1690 } else {
1691 new_ignores.push((ancestor, None));
1692 }
1693 }
1694
1695 let mut ignore_stack = IgnoreStack::none();
1696 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1697 if ignore_stack.is_path_ignored(&parent_path, true) {
1698 ignore_stack = IgnoreStack::all();
1699 break;
1700 } else if let Some(ignore) = ignore {
1701 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1702 }
1703 }
1704
1705 if ignore_stack.is_path_ignored(path, is_dir) {
1706 ignore_stack = IgnoreStack::all();
1707 }
1708
1709 ignore_stack
1710 }
1711}
1712
1713impl fmt::Debug for Snapshot {
1714 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1715 for entry in self.entries_by_path.cursor::<(), ()>() {
1716 for _ in entry.path().ancestors().skip(1) {
1717 write!(f, " ")?;
1718 }
1719 writeln!(f, "{:?} (inode: {})", entry.path(), entry.inode())?;
1720 }
1721 Ok(())
1722 }
1723}
1724
1725#[derive(Clone, PartialEq)]
1726pub struct File {
1727 entry_id: Option<usize>,
1728 worktree: ModelHandle<Worktree>,
1729 pub path: Arc<Path>,
1730 pub mtime: SystemTime,
1731}
1732
1733impl File {
1734 pub fn new(
1735 entry_id: usize,
1736 worktree: ModelHandle<Worktree>,
1737 path: Arc<Path>,
1738 mtime: SystemTime,
1739 ) -> Self {
1740 Self {
1741 entry_id: Some(entry_id),
1742 worktree,
1743 path,
1744 mtime,
1745 }
1746 }
1747
1748 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1749 self.worktree.update(cx, |worktree, cx| {
1750 if let Some((rpc, remote_id)) = match worktree {
1751 Worktree::Local(worktree) => worktree.rpc.clone(),
1752 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1753 } {
1754 cx.spawn(|_, _| async move {
1755 if let Err(error) = rpc
1756 .send(proto::UpdateBuffer {
1757 worktree_id: remote_id,
1758 buffer_id,
1759 operations: Some(operation).iter().map(Into::into).collect(),
1760 })
1761 .await
1762 {
1763 log::error!("error sending buffer operation: {}", error);
1764 }
1765 })
1766 .detach();
1767 }
1768 });
1769 }
1770
1771 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1772 self.worktree.update(cx, |worktree, cx| {
1773 if let Worktree::Remote(worktree) = worktree {
1774 let worktree_id = worktree.remote_id;
1775 let rpc = worktree.rpc.clone();
1776 cx.background()
1777 .spawn(async move {
1778 if let Err(error) = rpc
1779 .send(proto::CloseBuffer {
1780 worktree_id,
1781 buffer_id,
1782 })
1783 .await
1784 {
1785 log::error!("error closing remote buffer: {}", error);
1786 };
1787 })
1788 .detach();
1789 }
1790 });
1791 }
1792
1793 /// Returns this file's path relative to the root of its worktree.
1794 pub fn path(&self) -> Arc<Path> {
1795 self.path.clone()
1796 }
1797
1798 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1799 self.worktree.read(cx).abs_path.join(&self.path)
1800 }
1801
1802 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1803 /// of its worktree, then this method will return the name of the worktree itself.
1804 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1805 self.path
1806 .file_name()
1807 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1808 .map(Into::into)
1809 }
1810
1811 pub fn is_deleted(&self) -> bool {
1812 self.entry_id.is_none()
1813 }
1814
1815 pub fn exists(&self) -> bool {
1816 !self.is_deleted()
1817 }
1818
1819 pub fn save(
1820 &self,
1821 buffer_id: u64,
1822 text: Rope,
1823 version: time::Global,
1824 cx: &mut MutableAppContext,
1825 ) -> Task<Result<(time::Global, SystemTime)>> {
1826 self.worktree.update(cx, |worktree, cx| match worktree {
1827 Worktree::Local(worktree) => {
1828 let rpc = worktree.rpc.clone();
1829 let save = worktree.save(self.path.clone(), text, cx);
1830 cx.spawn(|_, _| async move {
1831 let entry = save.await?;
1832 if let Some((rpc, worktree_id)) = rpc {
1833 rpc.send(proto::BufferSaved {
1834 worktree_id,
1835 buffer_id,
1836 version: (&version).into(),
1837 mtime: Some(entry.mtime.into()),
1838 })
1839 .await?;
1840 }
1841 Ok((version, entry.mtime))
1842 })
1843 }
1844 Worktree::Remote(worktree) => {
1845 let rpc = worktree.rpc.clone();
1846 let worktree_id = worktree.remote_id;
1847 cx.spawn(|_, _| async move {
1848 let response = rpc
1849 .request(proto::SaveBuffer {
1850 worktree_id,
1851 buffer_id,
1852 })
1853 .await?;
1854 let version = response.version.try_into()?;
1855 let mtime = response
1856 .mtime
1857 .ok_or_else(|| anyhow!("missing mtime"))?
1858 .into();
1859 Ok((version, mtime))
1860 })
1861 }
1862 })
1863 }
1864
1865 pub fn worktree_id(&self) -> usize {
1866 self.worktree.id()
1867 }
1868
1869 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1870 (self.worktree.id(), self.path())
1871 }
1872}
1873
1874#[derive(Clone, Debug)]
1875pub struct Entry {
1876 id: usize,
1877 kind: EntryKind,
1878 path: Arc<Path>,
1879 inode: u64,
1880 mtime: SystemTime,
1881 is_symlink: bool,
1882 is_ignored: bool,
1883}
1884
1885#[derive(Clone, Debug)]
1886pub enum EntryKind {
1887 PendingDir,
1888 Dir,
1889 File(CharBag),
1890}
1891
1892impl Entry {
1893 pub fn path(&self) -> &Arc<Path> {
1894 &self.path
1895 }
1896
1897 pub fn inode(&self) -> u64 {
1898 self.inode
1899 }
1900
1901 pub fn is_ignored(&self) -> bool {
1902 self.is_ignored
1903 }
1904
1905 fn is_dir(&self) -> bool {
1906 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1907 }
1908
1909 fn is_file(&self) -> bool {
1910 matches!(self.kind, EntryKind::File(_))
1911 }
1912}
1913
1914impl sum_tree::Item for Entry {
1915 type Summary = EntrySummary;
1916
1917 fn summary(&self) -> Self::Summary {
1918 let file_count;
1919 let visible_file_count;
1920 if self.is_file() {
1921 file_count = 1;
1922 if self.is_ignored {
1923 visible_file_count = 0;
1924 } else {
1925 visible_file_count = 1;
1926 }
1927 } else {
1928 file_count = 0;
1929 visible_file_count = 0;
1930 }
1931
1932 EntrySummary {
1933 max_path: self.path().clone(),
1934 file_count,
1935 visible_file_count,
1936 }
1937 }
1938}
1939
1940impl sum_tree::KeyedItem for Entry {
1941 type Key = PathKey;
1942
1943 fn key(&self) -> Self::Key {
1944 PathKey(self.path().clone())
1945 }
1946}
1947
1948#[derive(Clone, Debug)]
1949pub struct EntrySummary {
1950 max_path: Arc<Path>,
1951 file_count: usize,
1952 visible_file_count: usize,
1953}
1954
1955impl Default for EntrySummary {
1956 fn default() -> Self {
1957 Self {
1958 max_path: Arc::from(Path::new("")),
1959 file_count: 0,
1960 visible_file_count: 0,
1961 }
1962 }
1963}
1964
1965impl sum_tree::Summary for EntrySummary {
1966 type Context = ();
1967
1968 fn add_summary(&mut self, rhs: &Self, _: &()) {
1969 self.max_path = rhs.max_path.clone();
1970 self.file_count += rhs.file_count;
1971 self.visible_file_count += rhs.visible_file_count;
1972 }
1973}
1974
1975#[derive(Clone, Debug)]
1976struct PathEntry {
1977 id: usize,
1978 path: Arc<Path>,
1979 scan_id: usize,
1980}
1981
1982impl sum_tree::Item for PathEntry {
1983 type Summary = PathEntrySummary;
1984
1985 fn summary(&self) -> Self::Summary {
1986 PathEntrySummary { max_id: self.id }
1987 }
1988}
1989
1990impl sum_tree::KeyedItem for PathEntry {
1991 type Key = usize;
1992
1993 fn key(&self) -> Self::Key {
1994 self.id
1995 }
1996}
1997
1998#[derive(Clone, Debug, Default)]
1999struct PathEntrySummary {
2000 max_id: usize,
2001}
2002
2003impl sum_tree::Summary for PathEntrySummary {
2004 type Context = ();
2005
2006 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2007 self.max_id = summary.max_id;
2008 }
2009}
2010
2011impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2012 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2013 *self = summary.max_id;
2014 }
2015}
2016
2017#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2018pub struct PathKey(Arc<Path>);
2019
2020impl Default for PathKey {
2021 fn default() -> Self {
2022 Self(Path::new("").into())
2023 }
2024}
2025
2026impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2027 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2028 self.0 = summary.max_path.clone();
2029 }
2030}
2031
2032#[derive(Copy, Clone, Debug, PartialEq, Eq)]
2033enum PathSearch<'a> {
2034 Exact(&'a Path),
2035 Successor(&'a Path),
2036}
2037
2038impl<'a> Ord for PathSearch<'a> {
2039 fn cmp(&self, other: &Self) -> cmp::Ordering {
2040 match (self, other) {
2041 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
2042 (Self::Successor(a), Self::Exact(b)) => {
2043 if b.starts_with(a) {
2044 cmp::Ordering::Greater
2045 } else {
2046 a.cmp(b)
2047 }
2048 }
2049 _ => unreachable!("not sure we need the other two cases"),
2050 }
2051 }
2052}
2053
2054impl<'a> PartialOrd for PathSearch<'a> {
2055 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
2056 Some(self.cmp(other))
2057 }
2058}
2059
2060impl<'a> Default for PathSearch<'a> {
2061 fn default() -> Self {
2062 Self::Exact(Path::new("").into())
2063 }
2064}
2065
2066impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
2067 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2068 *self = Self::Exact(summary.max_path.as_ref());
2069 }
2070}
2071
2072#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2073pub struct FileCount(usize);
2074
2075impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
2076 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2077 self.0 += summary.file_count;
2078 }
2079}
2080
2081#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
2082pub struct VisibleFileCount(usize);
2083
2084impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
2085 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2086 self.0 += summary.visible_file_count;
2087 }
2088}
2089
2090struct BackgroundScanner {
2091 fs: Arc<dyn Fs>,
2092 snapshot: Arc<Mutex<Snapshot>>,
2093 notify: Sender<ScanState>,
2094 executor: Arc<executor::Background>,
2095}
2096
2097impl BackgroundScanner {
2098 fn new(
2099 snapshot: Arc<Mutex<Snapshot>>,
2100 notify: Sender<ScanState>,
2101 fs: Arc<dyn Fs>,
2102 executor: Arc<executor::Background>,
2103 ) -> Self {
2104 Self {
2105 fs,
2106 snapshot,
2107 notify,
2108 executor,
2109 }
2110 }
2111
2112 fn abs_path(&self) -> Arc<Path> {
2113 self.snapshot.lock().abs_path.clone()
2114 }
2115
2116 fn snapshot(&self) -> Snapshot {
2117 self.snapshot.lock().clone()
2118 }
2119
2120 fn run(mut self, event_stream: fsevent::EventStream) {
2121 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
2122 return;
2123 }
2124
2125 if let Err(err) = smol::block_on(self.scan_dirs()) {
2126 if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
2127 return;
2128 }
2129 }
2130
2131 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
2132 return;
2133 }
2134
2135 event_stream.run(move |events| {
2136 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
2137 return false;
2138 }
2139
2140 if !smol::block_on(self.process_events(events)) {
2141 return false;
2142 }
2143
2144 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
2145 return false;
2146 }
2147
2148 true
2149 });
2150 }
2151
2152 #[cfg(any(test, feature = "test-support"))]
2153 async fn run_test(mut self, mut events_rx: postage::broadcast::Receiver<fsevent::Event>) {
2154 if self.notify.send(ScanState::Scanning).await.is_err() {
2155 return;
2156 }
2157
2158 if let Err(err) = self.scan_dirs().await {
2159 if self
2160 .notify
2161 .send(ScanState::Err(Arc::new(err)))
2162 .await
2163 .is_err()
2164 {
2165 return;
2166 }
2167 }
2168
2169 if self.notify.send(ScanState::Idle).await.is_err() {
2170 return;
2171 }
2172
2173 while let Some(event) = events_rx.recv().await {
2174 let mut events = vec![event];
2175 while let Ok(event) = events_rx.try_recv() {
2176 events.push(event);
2177 }
2178
2179 if self.notify.send(ScanState::Scanning).await.is_err() {
2180 break;
2181 }
2182
2183 if !self.process_events(events).await {
2184 break;
2185 }
2186
2187 if self.notify.send(ScanState::Idle).await.is_err() {
2188 break;
2189 }
2190 }
2191 }
2192
2193 async fn scan_dirs(&mut self) -> Result<()> {
2194 let next_entry_id;
2195 {
2196 let mut snapshot = self.snapshot.lock();
2197 snapshot.scan_id += 1;
2198 next_entry_id = snapshot.next_entry_id.clone();
2199 }
2200
2201 let path: Arc<Path> = Arc::from(Path::new(""));
2202 let abs_path = self.abs_path();
2203
2204 // After determining whether the root entry is a file or a directory, populate the
2205 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
2206 let mut root_name = abs_path
2207 .file_name()
2208 .map_or(String::new(), |f| f.to_string_lossy().to_string());
2209 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
2210 let entry = self
2211 .fs
2212 .entry(root_char_bag, &next_entry_id, path.clone(), &abs_path)
2213 .await?
2214 .ok_or_else(|| anyhow!("root entry does not exist"))?;
2215 let is_dir = entry.is_dir();
2216 if is_dir {
2217 root_name.push('/');
2218 }
2219
2220 {
2221 let mut snapshot = self.snapshot.lock();
2222 snapshot.root_name = root_name;
2223 snapshot.root_char_bag = root_char_bag;
2224 }
2225
2226 self.snapshot.lock().insert_entry(entry);
2227 if is_dir {
2228 let (tx, rx) = channel::unbounded();
2229 tx.send(ScanJob {
2230 abs_path: abs_path.to_path_buf(),
2231 path,
2232 ignore_stack: IgnoreStack::none(),
2233 scan_queue: tx.clone(),
2234 })
2235 .await
2236 .unwrap();
2237 drop(tx);
2238
2239 self.executor
2240 .scoped(|scope| {
2241 for _ in 0..self.executor.threads() {
2242 scope.spawn(async {
2243 while let Ok(job) = rx.recv().await {
2244 if let Err(err) = self
2245 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2246 .await
2247 {
2248 log::error!("error scanning {:?}: {}", job.abs_path, err);
2249 }
2250 }
2251 });
2252 }
2253 })
2254 .await;
2255 }
2256
2257 Ok(())
2258 }
2259
2260 async fn scan_dir(
2261 &self,
2262 root_char_bag: CharBag,
2263 next_entry_id: Arc<AtomicUsize>,
2264 job: &ScanJob,
2265 ) -> Result<()> {
2266 let mut new_entries: Vec<Entry> = Vec::new();
2267 let mut new_jobs: Vec<ScanJob> = Vec::new();
2268 let mut ignore_stack = job.ignore_stack.clone();
2269 let mut new_ignore = None;
2270
2271 let mut child_entries = self
2272 .fs
2273 .child_entries(
2274 root_char_bag,
2275 next_entry_id.as_ref(),
2276 &job.path,
2277 &job.abs_path,
2278 )
2279 .await?;
2280 while let Some(child_entry) = child_entries.next().await {
2281 let mut child_entry = match child_entry {
2282 Ok(child_entry) => child_entry,
2283 Err(error) => {
2284 log::error!("error processing entry {:?}", error);
2285 continue;
2286 }
2287 };
2288 let child_name = child_entry.path.file_name().unwrap();
2289 let child_abs_path = job.abs_path.join(&child_name);
2290 let child_path = child_entry.path.clone();
2291
2292 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2293 if child_name == *GITIGNORE {
2294 let (ignore, err) = Gitignore::new(&child_abs_path);
2295 if let Some(err) = err {
2296 log::error!("error in ignore file {:?} - {:?}", child_entry.path, err);
2297 }
2298 let ignore = Arc::new(ignore);
2299 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2300 new_ignore = Some(ignore);
2301
2302 // Update ignore status of any child entries we've already processed to reflect the
2303 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2304 // there should rarely be too numerous. Update the ignore stack associated with any
2305 // new jobs as well.
2306 let mut new_jobs = new_jobs.iter_mut();
2307 for entry in &mut new_entries {
2308 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2309 if entry.is_dir() {
2310 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2311 IgnoreStack::all()
2312 } else {
2313 ignore_stack.clone()
2314 };
2315 }
2316 }
2317 }
2318
2319 if child_entry.is_dir() {
2320 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2321 child_entry.is_ignored = is_ignored;
2322 new_entries.push(child_entry);
2323 new_jobs.push(ScanJob {
2324 abs_path: child_abs_path,
2325 path: child_path,
2326 ignore_stack: if is_ignored {
2327 IgnoreStack::all()
2328 } else {
2329 ignore_stack.clone()
2330 },
2331 scan_queue: job.scan_queue.clone(),
2332 });
2333 } else {
2334 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2335 new_entries.push(child_entry);
2336 };
2337 }
2338
2339 self.snapshot
2340 .lock()
2341 .populate_dir(job.path.clone(), new_entries, new_ignore);
2342 for new_job in new_jobs {
2343 job.scan_queue.send(new_job).await.unwrap();
2344 }
2345
2346 Ok(())
2347 }
2348
2349 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2350 let mut snapshot = self.snapshot();
2351 snapshot.scan_id += 1;
2352
2353 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2354 abs_path
2355 } else {
2356 return false;
2357 };
2358 let root_char_bag = snapshot.root_char_bag;
2359 let next_entry_id = snapshot.next_entry_id.clone();
2360
2361 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2362 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2363
2364 for event in &events {
2365 match event.path.strip_prefix(&root_abs_path) {
2366 Ok(path) => snapshot.remove_path(&path),
2367 Err(_) => {
2368 log::error!(
2369 "unexpected event {:?} for root path {:?}",
2370 event.path,
2371 root_abs_path
2372 );
2373 continue;
2374 }
2375 }
2376 }
2377
2378 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2379 for event in events {
2380 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2381 Ok(path) => Arc::from(path.to_path_buf()),
2382 Err(_) => {
2383 log::error!(
2384 "unexpected event {:?} for root path {:?}",
2385 event.path,
2386 root_abs_path
2387 );
2388 continue;
2389 }
2390 };
2391
2392 match self
2393 .fs
2394 .entry(
2395 snapshot.root_char_bag,
2396 &next_entry_id,
2397 path.clone(),
2398 &event.path,
2399 )
2400 .await
2401 {
2402 Ok(Some(mut fs_entry)) => {
2403 let is_dir = fs_entry.is_dir();
2404 let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
2405 fs_entry.is_ignored = ignore_stack.is_all();
2406 snapshot.insert_entry(fs_entry);
2407 if is_dir {
2408 scan_queue_tx
2409 .send(ScanJob {
2410 abs_path: event.path,
2411 path,
2412 ignore_stack,
2413 scan_queue: scan_queue_tx.clone(),
2414 })
2415 .await
2416 .unwrap();
2417 }
2418 }
2419 Ok(None) => {}
2420 Err(err) => {
2421 // TODO - create a special 'error' entry in the entries tree to mark this
2422 log::error!("error reading file on event {:?}", err);
2423 }
2424 }
2425 }
2426
2427 *self.snapshot.lock() = snapshot;
2428
2429 // Scan any directories that were created as part of this event batch.
2430 drop(scan_queue_tx);
2431 self.executor
2432 .scoped(|scope| {
2433 for _ in 0..self.executor.threads() {
2434 scope.spawn(async {
2435 while let Ok(job) = scan_queue_rx.recv().await {
2436 if let Err(err) = self
2437 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2438 .await
2439 {
2440 log::error!("error scanning {:?}: {}", job.abs_path, err);
2441 }
2442 }
2443 });
2444 }
2445 })
2446 .await;
2447
2448 // Attempt to detect renames only over a single batch of file-system events.
2449 self.snapshot.lock().removed_entry_ids.clear();
2450
2451 self.update_ignore_statuses().await;
2452 true
2453 }
2454
2455 async fn update_ignore_statuses(&self) {
2456 let mut snapshot = self.snapshot();
2457
2458 let mut ignores_to_update = Vec::new();
2459 let mut ignores_to_delete = Vec::new();
2460 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2461 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2462 ignores_to_update.push(parent_path.clone());
2463 }
2464
2465 let ignore_path = parent_path.join(&*GITIGNORE);
2466 if snapshot.entry_for_path(ignore_path).is_none() {
2467 ignores_to_delete.push(parent_path.clone());
2468 }
2469 }
2470
2471 for parent_path in ignores_to_delete {
2472 snapshot.ignores.remove(&parent_path);
2473 self.snapshot.lock().ignores.remove(&parent_path);
2474 }
2475
2476 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2477 ignores_to_update.sort_unstable();
2478 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2479 while let Some(parent_path) = ignores_to_update.next() {
2480 while ignores_to_update
2481 .peek()
2482 .map_or(false, |p| p.starts_with(&parent_path))
2483 {
2484 ignores_to_update.next().unwrap();
2485 }
2486
2487 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2488 ignore_queue_tx
2489 .send(UpdateIgnoreStatusJob {
2490 path: parent_path,
2491 ignore_stack,
2492 ignore_queue: ignore_queue_tx.clone(),
2493 })
2494 .await
2495 .unwrap();
2496 }
2497 drop(ignore_queue_tx);
2498
2499 self.executor
2500 .scoped(|scope| {
2501 for _ in 0..self.executor.threads() {
2502 scope.spawn(async {
2503 while let Ok(job) = ignore_queue_rx.recv().await {
2504 self.update_ignore_status(job, &snapshot).await;
2505 }
2506 });
2507 }
2508 })
2509 .await;
2510 }
2511
2512 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2513 let mut ignore_stack = job.ignore_stack;
2514 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2515 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2516 }
2517
2518 let mut edits = Vec::new();
2519 for mut entry in snapshot.child_entries(&job.path).cloned() {
2520 let was_ignored = entry.is_ignored;
2521 entry.is_ignored = ignore_stack.is_path_ignored(entry.path(), entry.is_dir());
2522 if entry.is_dir() {
2523 let child_ignore_stack = if entry.is_ignored {
2524 IgnoreStack::all()
2525 } else {
2526 ignore_stack.clone()
2527 };
2528 job.ignore_queue
2529 .send(UpdateIgnoreStatusJob {
2530 path: entry.path().clone(),
2531 ignore_stack: child_ignore_stack,
2532 ignore_queue: job.ignore_queue.clone(),
2533 })
2534 .await
2535 .unwrap();
2536 }
2537
2538 if entry.is_ignored != was_ignored {
2539 edits.push(Edit::Insert(entry));
2540 }
2541 }
2542 self.snapshot.lock().entries_by_path.edit(edits, &());
2543 }
2544}
2545
2546async fn refresh_entry(
2547 fs: &dyn Fs,
2548 snapshot: &Mutex<Snapshot>,
2549 path: Arc<Path>,
2550 abs_path: &Path,
2551) -> Result<Entry> {
2552 let root_char_bag;
2553 let next_entry_id;
2554 {
2555 let snapshot = snapshot.lock();
2556 root_char_bag = snapshot.root_char_bag;
2557 next_entry_id = snapshot.next_entry_id.clone();
2558 }
2559 let entry = fs
2560 .entry(root_char_bag, &next_entry_id, path, abs_path)
2561 .await?
2562 .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
2563 Ok(snapshot.lock().insert_entry(entry))
2564}
2565
2566fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2567 let mut result = root_char_bag;
2568 result.extend(
2569 path.to_string_lossy()
2570 .chars()
2571 .map(|c| c.to_ascii_lowercase()),
2572 );
2573 result
2574}
2575
2576struct ScanJob {
2577 abs_path: PathBuf,
2578 path: Arc<Path>,
2579 ignore_stack: Arc<IgnoreStack>,
2580 scan_queue: Sender<ScanJob>,
2581}
2582
2583struct UpdateIgnoreStatusJob {
2584 path: Arc<Path>,
2585 ignore_stack: Arc<IgnoreStack>,
2586 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2587}
2588
2589pub trait WorktreeHandle {
2590 #[cfg(test)]
2591 fn flush_fs_events<'a>(
2592 &self,
2593 cx: &'a gpui::TestAppContext,
2594 ) -> futures::future::LocalBoxFuture<'a, ()>;
2595}
2596
2597impl WorktreeHandle for ModelHandle<Worktree> {
2598 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2599 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2600 // extra directory scans, and emit extra scan-state notifications.
2601 //
2602 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2603 // to ensure that all redundant FS events have already been processed.
2604 #[cfg(test)]
2605 fn flush_fs_events<'a>(
2606 &self,
2607 cx: &'a gpui::TestAppContext,
2608 ) -> futures::future::LocalBoxFuture<'a, ()> {
2609 use smol::future::FutureExt;
2610
2611 let filename = "fs-event-sentinel";
2612 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2613 let tree = self.clone();
2614 async move {
2615 std::fs::write(root_path.join(filename), "").unwrap();
2616 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2617 .await;
2618
2619 std::fs::remove_file(root_path.join(filename)).unwrap();
2620 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2621 .await;
2622
2623 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2624 .await;
2625 }
2626 .boxed_local()
2627 }
2628}
2629
2630pub enum FileIter<'a> {
2631 All(Cursor<'a, Entry, FileCount, ()>),
2632 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2633}
2634
2635impl<'a> FileIter<'a> {
2636 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2637 let mut cursor = snapshot.entries_by_path.cursor();
2638 cursor.seek(&FileCount(start), Bias::Right, &());
2639 Self::All(cursor)
2640 }
2641
2642 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2643 let mut cursor = snapshot.entries_by_path.cursor();
2644 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2645 Self::Visible(cursor)
2646 }
2647
2648 fn next_internal(&mut self) {
2649 match self {
2650 Self::All(cursor) => {
2651 let ix = *cursor.seek_start();
2652 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2653 }
2654 Self::Visible(cursor) => {
2655 let ix = *cursor.seek_start();
2656 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2657 }
2658 }
2659 }
2660
2661 fn item(&self) -> Option<&'a Entry> {
2662 match self {
2663 Self::All(cursor) => cursor.item(),
2664 Self::Visible(cursor) => cursor.item(),
2665 }
2666 }
2667}
2668
2669impl<'a> Iterator for FileIter<'a> {
2670 type Item = &'a Entry;
2671
2672 fn next(&mut self) -> Option<Self::Item> {
2673 if let Some(entry) = self.item() {
2674 self.next_internal();
2675 Some(entry)
2676 } else {
2677 None
2678 }
2679 }
2680}
2681
2682struct ChildEntriesIter<'a> {
2683 parent_path: &'a Path,
2684 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2685}
2686
2687impl<'a> ChildEntriesIter<'a> {
2688 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2689 let mut cursor = snapshot.entries_by_path.cursor();
2690 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2691 Self {
2692 parent_path,
2693 cursor,
2694 }
2695 }
2696}
2697
2698impl<'a> Iterator for ChildEntriesIter<'a> {
2699 type Item = &'a Entry;
2700
2701 fn next(&mut self) -> Option<Self::Item> {
2702 if let Some(item) = self.cursor.item() {
2703 if item.path().starts_with(self.parent_path) {
2704 self.cursor
2705 .seek_forward(&PathSearch::Successor(item.path()), Bias::Left, &());
2706 Some(item)
2707 } else {
2708 None
2709 }
2710 } else {
2711 None
2712 }
2713 }
2714}
2715
2716impl<'a> From<&'a Entry> for proto::Entry {
2717 fn from(entry: &'a Entry) -> Self {
2718 Self {
2719 id: entry.id as u64,
2720 is_dir: entry.is_dir(),
2721 path: entry.path.to_string_lossy().to_string(),
2722 inode: entry.inode,
2723 mtime: Some(entry.mtime.into()),
2724 is_symlink: entry.is_symlink,
2725 is_ignored: entry.is_ignored,
2726 }
2727 }
2728}
2729
2730impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2731 type Error = anyhow::Error;
2732
2733 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2734 if let Some(mtime) = entry.mtime {
2735 let kind = if entry.is_dir {
2736 EntryKind::Dir
2737 } else {
2738 let mut char_bag = root_char_bag.clone();
2739 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2740 EntryKind::File(char_bag)
2741 };
2742 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2743 Ok(Entry {
2744 id: entry.id as usize,
2745 kind,
2746 path: path.clone(),
2747 inode: entry.inode,
2748 mtime: mtime.into(),
2749 is_symlink: entry.is_symlink,
2750 is_ignored: entry.is_ignored,
2751 })
2752 } else {
2753 Err(anyhow!(
2754 "missing mtime in remote worktree entry {:?}",
2755 entry.path
2756 ))
2757 }
2758 }
2759}
2760
2761mod remote {
2762 use super::*;
2763
2764 pub async fn add_peer(
2765 envelope: TypedEnvelope<proto::AddPeer>,
2766 rpc: &rpc::Client,
2767 cx: &mut AsyncAppContext,
2768 ) -> anyhow::Result<()> {
2769 rpc.state
2770 .read()
2771 .await
2772 .shared_worktree(envelope.payload.worktree_id, cx)?
2773 .update(cx, |worktree, cx| worktree.add_peer(envelope, cx))
2774 }
2775
2776 pub async fn remove_peer(
2777 envelope: TypedEnvelope<proto::RemovePeer>,
2778 rpc: &rpc::Client,
2779 cx: &mut AsyncAppContext,
2780 ) -> anyhow::Result<()> {
2781 rpc.state
2782 .read()
2783 .await
2784 .shared_worktree(envelope.payload.worktree_id, cx)?
2785 .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
2786 }
2787
2788 pub async fn update_worktree(
2789 envelope: TypedEnvelope<proto::UpdateWorktree>,
2790 rpc: &rpc::Client,
2791 cx: &mut AsyncAppContext,
2792 ) -> anyhow::Result<()> {
2793 rpc.state
2794 .read()
2795 .await
2796 .shared_worktree(envelope.payload.worktree_id, cx)?
2797 .update(cx, |worktree, _| {
2798 if let Some(worktree) = worktree.as_remote_mut() {
2799 let mut tx = worktree.updates_tx.clone();
2800 Ok(async move {
2801 tx.send(envelope.payload)
2802 .await
2803 .expect("receiver runs to completion");
2804 })
2805 } else {
2806 Err(anyhow!(
2807 "invalid update message for local worktree {}",
2808 envelope.payload.worktree_id
2809 ))
2810 }
2811 })?
2812 .await;
2813
2814 Ok(())
2815 }
2816
2817 pub async fn open_buffer(
2818 envelope: TypedEnvelope<proto::OpenBuffer>,
2819 rpc: &rpc::Client,
2820 cx: &mut AsyncAppContext,
2821 ) -> anyhow::Result<()> {
2822 let receipt = envelope.receipt();
2823 let worktree = rpc
2824 .state
2825 .read()
2826 .await
2827 .shared_worktree(envelope.payload.worktree_id, cx)?;
2828
2829 let response = worktree
2830 .update(cx, |worktree, cx| {
2831 worktree
2832 .as_local_mut()
2833 .unwrap()
2834 .open_remote_buffer(envelope, cx)
2835 })
2836 .await?;
2837
2838 rpc.respond(receipt, response).await?;
2839
2840 Ok(())
2841 }
2842
2843 pub async fn close_buffer(
2844 envelope: TypedEnvelope<proto::CloseBuffer>,
2845 rpc: &rpc::Client,
2846 cx: &mut AsyncAppContext,
2847 ) -> anyhow::Result<()> {
2848 let worktree = rpc
2849 .state
2850 .read()
2851 .await
2852 .shared_worktree(envelope.payload.worktree_id, cx)?;
2853
2854 worktree.update(cx, |worktree, cx| {
2855 worktree
2856 .as_local_mut()
2857 .unwrap()
2858 .close_remote_buffer(envelope, cx)
2859 })
2860 }
2861
2862 pub async fn update_buffer(
2863 envelope: TypedEnvelope<proto::UpdateBuffer>,
2864 rpc: &rpc::Client,
2865 cx: &mut AsyncAppContext,
2866 ) -> anyhow::Result<()> {
2867 eprintln!("got update buffer message {:?}", envelope.payload);
2868
2869 let message = envelope.payload;
2870 rpc.state
2871 .read()
2872 .await
2873 .shared_worktree(message.worktree_id, cx)?
2874 .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
2875 Ok(())
2876 }
2877
2878 pub async fn save_buffer(
2879 envelope: TypedEnvelope<proto::SaveBuffer>,
2880 rpc: &rpc::Client,
2881 cx: &mut AsyncAppContext,
2882 ) -> anyhow::Result<()> {
2883 eprintln!("got save buffer message {:?}", envelope.payload);
2884
2885 let state = rpc.state.read().await;
2886 let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
2887 let sender_id = envelope.original_sender_id()?;
2888 let buffer = worktree.read_with(cx, |tree, _| {
2889 tree.as_local()
2890 .unwrap()
2891 .shared_buffers
2892 .get(&sender_id)
2893 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
2894 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
2895 })?;
2896 let (version, mtime) = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
2897 rpc.respond(
2898 envelope.receipt(),
2899 proto::BufferSaved {
2900 worktree_id: envelope.payload.worktree_id,
2901 buffer_id: envelope.payload.buffer_id,
2902 version: (&version).into(),
2903 mtime: Some(mtime.into()),
2904 },
2905 )
2906 .await?;
2907 Ok(())
2908 }
2909
2910 pub async fn buffer_saved(
2911 envelope: TypedEnvelope<proto::BufferSaved>,
2912 rpc: &rpc::Client,
2913 cx: &mut AsyncAppContext,
2914 ) -> anyhow::Result<()> {
2915 eprintln!("got buffer_saved {:?}", envelope.payload);
2916
2917 rpc.state
2918 .read()
2919 .await
2920 .shared_worktree(envelope.payload.worktree_id, cx)?
2921 .update(cx, |worktree, cx| {
2922 worktree.buffer_saved(envelope.payload, cx)
2923 })?;
2924 Ok(())
2925 }
2926}
2927
2928#[cfg(test)]
2929mod tests {
2930 use super::*;
2931 use crate::test::*;
2932 use anyhow::Result;
2933 use rand::prelude::*;
2934 use serde_json::json;
2935 use std::time::UNIX_EPOCH;
2936 use std::{env, fmt::Write, os::unix, time::SystemTime};
2937
2938 #[gpui::test]
2939 async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
2940 let dir = temp_tree(json!({
2941 "root": {
2942 "apple": "",
2943 "banana": {
2944 "carrot": {
2945 "date": "",
2946 "endive": "",
2947 }
2948 },
2949 "fennel": {
2950 "grape": "",
2951 }
2952 }
2953 }));
2954
2955 let root_link_path = dir.path().join("root_link");
2956 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2957 unix::fs::symlink(
2958 &dir.path().join("root/fennel"),
2959 &dir.path().join("root/finnochio"),
2960 )
2961 .unwrap();
2962
2963 let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx));
2964
2965 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2966 .await;
2967 cx.read(|cx| {
2968 let tree = tree.read(cx);
2969 assert_eq!(tree.file_count(), 5);
2970
2971 assert_eq!(
2972 tree.inode_for_path("fennel/grape"),
2973 tree.inode_for_path("finnochio/grape")
2974 );
2975
2976 let results = match_paths(
2977 Some(tree.snapshot()).iter(),
2978 "bna",
2979 false,
2980 false,
2981 false,
2982 10,
2983 Default::default(),
2984 cx.thread_pool().clone(),
2985 )
2986 .into_iter()
2987 .map(|result| result.path)
2988 .collect::<Vec<Arc<Path>>>();
2989 assert_eq!(
2990 results,
2991 vec![
2992 PathBuf::from("banana/carrot/date").into(),
2993 PathBuf::from("banana/carrot/endive").into(),
2994 ]
2995 );
2996 })
2997 }
2998
2999 #[gpui::test]
3000 async fn test_save_file(mut cx: gpui::TestAppContext) {
3001 let app_state = cx.read(build_app_state);
3002 let dir = temp_tree(json!({
3003 "file1": "the old contents",
3004 }));
3005 let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages.clone(), cx));
3006 let buffer = tree
3007 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3008 .await
3009 .unwrap();
3010 let save = buffer.update(&mut cx, |buffer, cx| {
3011 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3012 buffer.save(cx).unwrap()
3013 });
3014 save.await.unwrap();
3015
3016 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3017 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3018 }
3019
3020 #[gpui::test]
3021 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3022 let app_state = cx.read(build_app_state);
3023 let dir = temp_tree(json!({
3024 "file1": "the old contents",
3025 }));
3026 let file_path = dir.path().join("file1");
3027
3028 let tree =
3029 cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages.clone(), cx));
3030 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3031 .await;
3032 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3033
3034 let buffer = tree
3035 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3036 .await
3037 .unwrap();
3038 let save = buffer.update(&mut cx, |buffer, cx| {
3039 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3040 buffer.save(cx).unwrap()
3041 });
3042 save.await.unwrap();
3043
3044 let new_text = std::fs::read_to_string(file_path).unwrap();
3045 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3046 }
3047
3048 #[gpui::test]
3049 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3050 let dir = temp_tree(json!({
3051 "a": {
3052 "file1": "",
3053 "file2": "",
3054 "file3": "",
3055 },
3056 "b": {
3057 "c": {
3058 "file4": "",
3059 "file5": "",
3060 }
3061 }
3062 }));
3063
3064 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
3065
3066 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3067 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3068 async move { buffer.await.unwrap() }
3069 };
3070 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3071 tree.read_with(cx, |tree, _| {
3072 tree.entry_for_path(path)
3073 .expect(&format!("no entry for path {}", path))
3074 .id
3075 })
3076 };
3077
3078 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3079 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3080 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3081 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3082
3083 let file2_id = id_for_path("a/file2", &cx);
3084 let file3_id = id_for_path("a/file3", &cx);
3085 let file4_id = id_for_path("b/c/file4", &cx);
3086
3087 // Wait for the initial scan.
3088 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3089 .await;
3090
3091 // Create a remote copy of this worktree.
3092 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3093 let worktree_id = 1;
3094 let share_request = tree
3095 .update(&mut cx, |tree, cx| {
3096 tree.as_local().unwrap().share_request(cx)
3097 })
3098 .await;
3099 let remote = Worktree::remote(
3100 proto::OpenWorktreeResponse {
3101 worktree_id,
3102 worktree: share_request.worktree,
3103 replica_id: 1,
3104 peers: Vec::new(),
3105 },
3106 rpc::Client::new(Default::default()),
3107 Default::default(),
3108 &mut cx.to_async(),
3109 )
3110 .await
3111 .unwrap();
3112
3113 cx.read(|cx| {
3114 assert!(!buffer2.read(cx).is_dirty());
3115 assert!(!buffer3.read(cx).is_dirty());
3116 assert!(!buffer4.read(cx).is_dirty());
3117 assert!(!buffer5.read(cx).is_dirty());
3118 });
3119
3120 // Rename and delete files and directories.
3121 tree.flush_fs_events(&cx).await;
3122 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3123 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3124 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3125 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3126 tree.flush_fs_events(&cx).await;
3127
3128 let expected_paths = vec![
3129 "a",
3130 "a/file1",
3131 "a/file2.new",
3132 "b",
3133 "d",
3134 "d/file3",
3135 "d/file4",
3136 ];
3137
3138 cx.read(|app| {
3139 assert_eq!(
3140 tree.read(app)
3141 .paths()
3142 .map(|p| p.to_str().unwrap())
3143 .collect::<Vec<_>>(),
3144 expected_paths
3145 );
3146
3147 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3148 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3149 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3150
3151 assert_eq!(
3152 buffer2.read(app).file().unwrap().path().as_ref(),
3153 Path::new("a/file2.new")
3154 );
3155 assert_eq!(
3156 buffer3.read(app).file().unwrap().path().as_ref(),
3157 Path::new("d/file3")
3158 );
3159 assert_eq!(
3160 buffer4.read(app).file().unwrap().path().as_ref(),
3161 Path::new("d/file4")
3162 );
3163 assert_eq!(
3164 buffer5.read(app).file().unwrap().path().as_ref(),
3165 Path::new("b/c/file5")
3166 );
3167
3168 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3169 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3170 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3171 assert!(buffer5.read(app).file().unwrap().is_deleted());
3172 });
3173
3174 // Update the remote worktree. Check that it becomes consistent with the
3175 // local worktree.
3176 remote.update(&mut cx, |remote, cx| {
3177 let update_message = tree
3178 .read(cx)
3179 .snapshot()
3180 .build_update(&initial_snapshot, worktree_id);
3181 remote
3182 .as_remote_mut()
3183 .unwrap()
3184 .snapshot
3185 .apply_update(update_message)
3186 .unwrap();
3187
3188 assert_eq!(
3189 remote
3190 .paths()
3191 .map(|p| p.to_str().unwrap())
3192 .collect::<Vec<_>>(),
3193 expected_paths
3194 );
3195 });
3196 }
3197
3198 #[gpui::test]
3199 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3200 let dir = temp_tree(json!({
3201 ".git": {},
3202 ".gitignore": "ignored-dir\n",
3203 "tracked-dir": {
3204 "tracked-file1": "tracked contents",
3205 },
3206 "ignored-dir": {
3207 "ignored-file1": "ignored contents",
3208 }
3209 }));
3210
3211 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
3212 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3213 .await;
3214 tree.flush_fs_events(&cx).await;
3215 cx.read(|cx| {
3216 let tree = tree.read(cx);
3217 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3218 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3219 assert_eq!(tracked.is_ignored(), false);
3220 assert_eq!(ignored.is_ignored(), true);
3221 });
3222
3223 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3224 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3225 tree.flush_fs_events(&cx).await;
3226 cx.read(|cx| {
3227 let tree = tree.read(cx);
3228 let dot_git = tree.entry_for_path(".git").unwrap();
3229 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3230 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3231 assert_eq!(tracked.is_ignored(), false);
3232 assert_eq!(ignored.is_ignored(), true);
3233 assert_eq!(dot_git.is_ignored(), true);
3234 });
3235 }
3236
3237 #[test]
3238 fn test_random() {
3239 let iterations = env::var("ITERATIONS")
3240 .map(|i| i.parse().unwrap())
3241 .unwrap_or(100);
3242 let operations = env::var("OPERATIONS")
3243 .map(|o| o.parse().unwrap())
3244 .unwrap_or(40);
3245 let initial_entries = env::var("INITIAL_ENTRIES")
3246 .map(|o| o.parse().unwrap())
3247 .unwrap_or(20);
3248 let seeds = if let Ok(seed) = env::var("SEED").map(|s| s.parse().unwrap()) {
3249 seed..seed + 1
3250 } else {
3251 0..iterations
3252 };
3253
3254 for seed in seeds {
3255 dbg!(seed);
3256 let mut rng = StdRng::seed_from_u64(seed);
3257
3258 let root_dir = tempdir::TempDir::new(&format!("test-{}", seed)).unwrap();
3259 for _ in 0..initial_entries {
3260 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3261 }
3262 log::info!("Generated initial tree");
3263
3264 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3265 let mut scanner = BackgroundScanner::new(
3266 Arc::new(Mutex::new(Snapshot {
3267 id: 0,
3268 scan_id: 0,
3269 abs_path: root_dir.path().into(),
3270 entries_by_path: Default::default(),
3271 entries_by_id: Default::default(),
3272 removed_entry_ids: Default::default(),
3273 ignores: Default::default(),
3274 root_name: Default::default(),
3275 root_char_bag: Default::default(),
3276 next_entry_id: Default::default(),
3277 })),
3278 notify_tx,
3279 Arc::new(ProductionFs),
3280 Arc::new(gpui::executor::Background::new()),
3281 );
3282 smol::block_on(scanner.scan_dirs()).unwrap();
3283 scanner.snapshot().check_invariants();
3284
3285 let mut events = Vec::new();
3286 let mut mutations_len = operations;
3287 while mutations_len > 1 {
3288 if !events.is_empty() && rng.gen_bool(0.4) {
3289 let len = rng.gen_range(0..=events.len());
3290 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3291 log::info!("Delivering events: {:#?}", to_deliver);
3292 smol::block_on(scanner.process_events(to_deliver));
3293 scanner.snapshot().check_invariants();
3294 } else {
3295 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3296 mutations_len -= 1;
3297 }
3298 }
3299 log::info!("Quiescing: {:#?}", events);
3300 smol::block_on(scanner.process_events(events));
3301 scanner.snapshot().check_invariants();
3302
3303 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3304 let mut new_scanner = BackgroundScanner::new(
3305 Arc::new(Mutex::new(Snapshot {
3306 id: 0,
3307 scan_id: 0,
3308 abs_path: root_dir.path().into(),
3309 entries_by_path: Default::default(),
3310 entries_by_id: Default::default(),
3311 removed_entry_ids: Default::default(),
3312 ignores: Default::default(),
3313 root_name: Default::default(),
3314 root_char_bag: Default::default(),
3315 next_entry_id: Default::default(),
3316 })),
3317 notify_tx,
3318 scanner.fs.clone(),
3319 scanner.executor.clone(),
3320 );
3321 smol::block_on(new_scanner.scan_dirs()).unwrap();
3322 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3323 }
3324 }
3325
3326 fn randomly_mutate_tree(
3327 root_path: &Path,
3328 insertion_probability: f64,
3329 rng: &mut impl Rng,
3330 ) -> Result<Vec<fsevent::Event>> {
3331 let root_path = root_path.canonicalize().unwrap();
3332 let (dirs, files) = read_dir_recursive(root_path.clone());
3333
3334 let mut events = Vec::new();
3335 let mut record_event = |path: PathBuf| {
3336 events.push(fsevent::Event {
3337 event_id: SystemTime::now()
3338 .duration_since(UNIX_EPOCH)
3339 .unwrap()
3340 .as_secs(),
3341 flags: fsevent::StreamFlags::empty(),
3342 path,
3343 });
3344 };
3345
3346 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3347 let path = dirs.choose(rng).unwrap();
3348 let new_path = path.join(gen_name(rng));
3349
3350 if rng.gen() {
3351 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3352 std::fs::create_dir(&new_path)?;
3353 } else {
3354 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3355 std::fs::write(&new_path, "")?;
3356 }
3357 record_event(new_path);
3358 } else if rng.gen_bool(0.05) {
3359 let ignore_dir_path = dirs.choose(rng).unwrap();
3360 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3361
3362 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3363 let files_to_ignore = {
3364 let len = rng.gen_range(0..=subfiles.len());
3365 subfiles.choose_multiple(rng, len)
3366 };
3367 let dirs_to_ignore = {
3368 let len = rng.gen_range(0..subdirs.len());
3369 subdirs.choose_multiple(rng, len)
3370 };
3371
3372 let mut ignore_contents = String::new();
3373 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3374 write!(
3375 ignore_contents,
3376 "{}\n",
3377 path_to_ignore
3378 .strip_prefix(&ignore_dir_path)?
3379 .to_str()
3380 .unwrap()
3381 )
3382 .unwrap();
3383 }
3384 log::info!(
3385 "Creating {:?} with contents:\n{}",
3386 ignore_path.strip_prefix(&root_path)?,
3387 ignore_contents
3388 );
3389 std::fs::write(&ignore_path, ignore_contents).unwrap();
3390 record_event(ignore_path);
3391 } else {
3392 let old_path = {
3393 let file_path = files.choose(rng);
3394 let dir_path = dirs[1..].choose(rng);
3395 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3396 };
3397
3398 let is_rename = rng.gen();
3399 if is_rename {
3400 let new_path_parent = dirs
3401 .iter()
3402 .filter(|d| !d.starts_with(old_path))
3403 .choose(rng)
3404 .unwrap();
3405
3406 let overwrite_existing_dir =
3407 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3408 let new_path = if overwrite_existing_dir {
3409 std::fs::remove_dir_all(&new_path_parent).ok();
3410 new_path_parent.to_path_buf()
3411 } else {
3412 new_path_parent.join(gen_name(rng))
3413 };
3414
3415 log::info!(
3416 "Renaming {:?} to {}{:?}",
3417 old_path.strip_prefix(&root_path)?,
3418 if overwrite_existing_dir {
3419 "overwrite "
3420 } else {
3421 ""
3422 },
3423 new_path.strip_prefix(&root_path)?
3424 );
3425 std::fs::rename(&old_path, &new_path)?;
3426 record_event(old_path.clone());
3427 record_event(new_path);
3428 } else if old_path.is_dir() {
3429 let (dirs, files) = read_dir_recursive(old_path.clone());
3430
3431 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3432 std::fs::remove_dir_all(&old_path).unwrap();
3433 for file in files {
3434 record_event(file);
3435 }
3436 for dir in dirs {
3437 record_event(dir);
3438 }
3439 } else {
3440 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3441 std::fs::remove_file(old_path).unwrap();
3442 record_event(old_path.clone());
3443 }
3444 }
3445
3446 Ok(events)
3447 }
3448
3449 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3450 let child_entries = std::fs::read_dir(&path).unwrap();
3451 let mut dirs = vec![path];
3452 let mut files = Vec::new();
3453 for child_entry in child_entries {
3454 let child_path = child_entry.unwrap().path();
3455 if child_path.is_dir() {
3456 let (child_dirs, child_files) = read_dir_recursive(child_path);
3457 dirs.extend(child_dirs);
3458 files.extend(child_files);
3459 } else {
3460 files.push(child_path);
3461 }
3462 }
3463 (dirs, files)
3464 }
3465
3466 fn gen_name(rng: &mut impl Rng) -> String {
3467 (0..6)
3468 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3469 .map(char::from)
3470 .collect()
3471 }
3472
3473 impl Snapshot {
3474 fn check_invariants(&self) {
3475 let mut files = self.files(0);
3476 let mut visible_files = self.visible_files(0);
3477 for entry in self.entries_by_path.cursor::<(), ()>() {
3478 if entry.is_file() {
3479 assert_eq!(files.next().unwrap().inode(), entry.inode);
3480 if !entry.is_ignored {
3481 assert_eq!(visible_files.next().unwrap().inode(), entry.inode);
3482 }
3483 }
3484 }
3485 assert!(files.next().is_none());
3486 assert!(visible_files.next().is_none());
3487
3488 let mut bfs_paths = Vec::new();
3489 let mut stack = vec![Path::new("")];
3490 while let Some(path) = stack.pop() {
3491 bfs_paths.push(path);
3492 let ix = stack.len();
3493 for child_entry in self.child_entries(path) {
3494 stack.insert(ix, child_entry.path());
3495 }
3496 }
3497
3498 let dfs_paths = self
3499 .entries_by_path
3500 .cursor::<(), ()>()
3501 .map(|e| e.path().as_ref())
3502 .collect::<Vec<_>>();
3503 assert_eq!(bfs_paths, dfs_paths);
3504
3505 for (ignore_parent_path, _) in &self.ignores {
3506 assert!(self.entry_for_path(ignore_parent_path).is_some());
3507 assert!(self
3508 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3509 .is_some());
3510 }
3511 }
3512
3513 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3514 let mut paths = Vec::new();
3515 for entry in self.entries_by_path.cursor::<(), ()>() {
3516 paths.push((entry.path().as_ref(), entry.inode(), entry.is_ignored()));
3517 }
3518 paths.sort_by(|a, b| a.0.cmp(&b.0));
3519 paths
3520 }
3521 }
3522}