1mod char_bag;
2mod fuzzy;
3mod ignore;
4
5use self::{char_bag::CharBag, ignore::IgnoreStack};
6use crate::{
7 editor::{self, Buffer, History, Operation, Rope},
8 language::LanguageRegistry,
9 rpc::{self, proto},
10 sum_tree::{self, Cursor, Edit, SumTree},
11 time::{self, ReplicaId},
12 util::Bias,
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Context, Result};
16use atomic::Ordering::SeqCst;
17pub use fuzzy::{match_paths, PathMatch};
18use gpui::{
19 scoped_pool, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
20 Task, WeakModelHandle,
21};
22use lazy_static::lazy_static;
23use parking_lot::Mutex;
24use postage::{
25 prelude::{Sink, Stream},
26 watch,
27};
28use smol::{
29 channel::Sender,
30 io::{AsyncReadExt, AsyncWriteExt},
31};
32use std::{
33 cmp::{self, Ordering},
34 collections::HashMap,
35 convert::{TryFrom, TryInto},
36 ffi::{OsStr, OsString},
37 fmt, fs,
38 future::Future,
39 io,
40 ops::Deref,
41 os::unix::fs::MetadataExt,
42 path::{Path, PathBuf},
43 sync::{
44 atomic::{self, AtomicUsize},
45 Arc,
46 },
47 time::{Duration, SystemTime},
48};
49use zed_rpc::{PeerId, TypedEnvelope};
50
51lazy_static! {
52 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
53}
54
55pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
56 rpc.on_message(remote::add_peer, cx);
57 rpc.on_message(remote::remove_peer, cx);
58 rpc.on_message(remote::update_worktree, cx);
59 rpc.on_message(remote::open_buffer, cx);
60 rpc.on_message(remote::close_buffer, cx);
61 rpc.on_message(remote::update_buffer, cx);
62 rpc.on_message(remote::buffer_saved, cx);
63 rpc.on_message(remote::save_buffer, cx);
64}
65
66#[derive(Clone, Debug)]
67enum ScanState {
68 Idle,
69 Scanning,
70 Err(Arc<io::Error>),
71}
72
73pub enum Worktree {
74 Local(LocalWorktree),
75 Remote(RemoteWorktree),
76}
77
78impl Entity for Worktree {
79 type Event = ();
80
81 fn release(&mut self, cx: &mut MutableAppContext) {
82 let rpc = match self {
83 Self::Local(tree) => tree.rpc.clone(),
84 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
85 };
86
87 if let Some((rpc, worktree_id)) = rpc {
88 cx.spawn(|_| async move {
89 rpc.state
90 .write()
91 .await
92 .shared_worktrees
93 .remove(&worktree_id);
94 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
95 log::error!("error closing worktree {}: {}", worktree_id, err);
96 }
97 })
98 .detach();
99 }
100 }
101}
102
103impl Worktree {
104 pub fn local(
105 path: impl Into<Arc<Path>>,
106 languages: Arc<LanguageRegistry>,
107 cx: &mut ModelContext<Worktree>,
108 ) -> Self {
109 Worktree::Local(LocalWorktree::new(path, languages, cx))
110 }
111
112 pub async fn open_remote(
113 rpc: rpc::Client,
114 id: u64,
115 access_token: String,
116 languages: Arc<LanguageRegistry>,
117 cx: &mut AsyncAppContext,
118 ) -> Result<ModelHandle<Self>> {
119 let response = rpc
120 .request(proto::OpenWorktree {
121 worktree_id: id,
122 access_token,
123 })
124 .await?;
125
126 Worktree::remote(response, rpc, languages, cx).await
127 }
128
129 async fn remote(
130 open_response: proto::OpenWorktreeResponse,
131 rpc: rpc::Client,
132 languages: Arc<LanguageRegistry>,
133 cx: &mut AsyncAppContext,
134 ) -> Result<ModelHandle<Self>> {
135 let worktree = open_response
136 .worktree
137 .ok_or_else(|| anyhow!("empty worktree"))?;
138
139 let remote_id = open_response.worktree_id;
140 let replica_id = open_response.replica_id as ReplicaId;
141 let peers = open_response.peers;
142 let root_char_bag: CharBag = worktree
143 .root_name
144 .chars()
145 .map(|c| c.to_ascii_lowercase())
146 .collect();
147 let root_name = worktree.root_name.clone();
148 let (entries, paths_by_id) = cx
149 .background()
150 .spawn(async move {
151 let mut paths_by_id = rpds::RedBlackTreeMapSync::default();
152 let mut edits = Vec::new();
153 for entry in worktree.entries {
154 match Entry::try_from((&root_char_bag, entry)) {
155 Ok(entry) => {
156 paths_by_id.insert_mut(entry.id as usize, (entry.path.clone(), 0));
157 edits.push(Edit::Insert(entry));
158 }
159 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
160 }
161 }
162 let mut entries = SumTree::new();
163 entries.edit(edits, &());
164 (entries, paths_by_id)
165 })
166 .await;
167
168 let worktree = cx.update(|cx| {
169 cx.add_model(|cx: &mut ModelContext<Worktree>| {
170 let snapshot = Snapshot {
171 id: cx.model_id(),
172 scan_id: 0,
173 abs_path: Path::new("").into(),
174 root_name,
175 root_char_bag,
176 ignores: Default::default(),
177 entries,
178 paths_by_id,
179 removed_entry_ids: Default::default(),
180 next_entry_id: Default::default(),
181 };
182
183 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
184 let (mut snapshot_tx, mut snapshot_rx) =
185 postage::watch::channel_with(snapshot.clone());
186
187 cx.background()
188 .spawn(async move {
189 while let Some(update) = updates_rx.recv().await {
190 let mut snapshot = snapshot_tx.borrow().clone();
191 if let Err(error) = snapshot.apply_update(update) {
192 log::error!("error applying worktree update: {}", error);
193 }
194 *snapshot_tx.borrow_mut() = snapshot;
195 }
196 })
197 .detach();
198
199 cx.spawn_weak(|this, mut cx| async move {
200 while let Some(snapshot) = snapshot_rx.recv().await {
201 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
202 this.update(&mut cx, |this, cx| {
203 let this = this.as_remote_mut().unwrap();
204 this.snapshot = snapshot;
205 cx.notify();
206 this.update_open_buffers(cx);
207 });
208 } else {
209 break;
210 }
211 }
212 })
213 .detach();
214
215 Worktree::Remote(RemoteWorktree {
216 remote_id,
217 replica_id,
218 snapshot,
219 updates_tx,
220 rpc: rpc.clone(),
221 open_buffers: Default::default(),
222 peers: peers
223 .into_iter()
224 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
225 .collect(),
226 languages,
227 })
228 })
229 });
230 rpc.state
231 .write()
232 .await
233 .shared_worktrees
234 .insert(open_response.worktree_id, worktree.downgrade());
235
236 Ok(worktree)
237 }
238
239 pub fn as_local(&self) -> Option<&LocalWorktree> {
240 if let Worktree::Local(worktree) = self {
241 Some(worktree)
242 } else {
243 None
244 }
245 }
246
247 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
248 if let Worktree::Local(worktree) = self {
249 Some(worktree)
250 } else {
251 None
252 }
253 }
254
255 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
256 if let Worktree::Remote(worktree) = self {
257 Some(worktree)
258 } else {
259 None
260 }
261 }
262
263 pub fn snapshot(&self) -> Snapshot {
264 match self {
265 Worktree::Local(worktree) => worktree.snapshot(),
266 Worktree::Remote(worktree) => worktree.snapshot(),
267 }
268 }
269
270 pub fn replica_id(&self) -> ReplicaId {
271 match self {
272 Worktree::Local(_) => 0,
273 Worktree::Remote(worktree) => worktree.replica_id,
274 }
275 }
276
277 pub fn add_peer(
278 &mut self,
279 envelope: TypedEnvelope<proto::AddPeer>,
280 cx: &mut ModelContext<Worktree>,
281 ) -> Result<()> {
282 match self {
283 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
284 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
285 }
286 }
287
288 pub fn remove_peer(
289 &mut self,
290 envelope: TypedEnvelope<proto::RemovePeer>,
291 cx: &mut ModelContext<Worktree>,
292 ) -> Result<()> {
293 match self {
294 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
295 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
296 }
297 }
298
299 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
300 match self {
301 Worktree::Local(worktree) => &worktree.peers,
302 Worktree::Remote(worktree) => &worktree.peers,
303 }
304 }
305
306 pub fn open_buffer(
307 &mut self,
308 path: impl AsRef<Path>,
309 cx: &mut ModelContext<Self>,
310 ) -> Task<Result<ModelHandle<Buffer>>> {
311 match self {
312 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
313 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
314 }
315 }
316
317 #[cfg(feature = "test-support")]
318 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
319 let open_buffers = match self {
320 Worktree::Local(worktree) => &worktree.open_buffers,
321 Worktree::Remote(worktree) => &worktree.open_buffers,
322 };
323
324 let path = path.as_ref();
325 open_buffers
326 .values()
327 .find(|buffer| {
328 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
329 file.path.as_ref() == path
330 } else {
331 false
332 }
333 })
334 .is_some()
335 }
336
337 pub fn update_buffer(
338 &mut self,
339 envelope: proto::UpdateBuffer,
340 cx: &mut ModelContext<Self>,
341 ) -> Result<()> {
342 let open_buffers = match self {
343 Worktree::Local(worktree) => &worktree.open_buffers,
344 Worktree::Remote(worktree) => &worktree.open_buffers,
345 };
346 let buffer = open_buffers
347 .get(&(envelope.buffer_id as usize))
348 .and_then(|buf| buf.upgrade(&cx));
349
350 let buffer = if let Some(buffer) = buffer {
351 buffer
352 } else {
353 return if matches!(self, Worktree::Local(_)) {
354 Err(anyhow!(
355 "invalid buffer {} in update buffer message",
356 envelope.buffer_id
357 ))
358 } else {
359 Ok(())
360 };
361 };
362
363 let ops = envelope
364 .operations
365 .into_iter()
366 .map(|op| op.try_into())
367 .collect::<anyhow::Result<Vec<_>>>()?;
368 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
369 Ok(())
370 }
371
372 pub fn buffer_saved(
373 &mut self,
374 message: proto::BufferSaved,
375 cx: &mut ModelContext<Self>,
376 ) -> Result<()> {
377 if let Worktree::Remote(worktree) = self {
378 if let Some(buffer) = worktree
379 .open_buffers
380 .get(&(message.buffer_id as usize))
381 .and_then(|buf| buf.upgrade(&cx))
382 {
383 buffer.update(cx, |buffer, cx| {
384 buffer.did_save(message.version.try_into()?, cx)
385 })?;
386 }
387 Ok(())
388 } else {
389 Err(anyhow!(
390 "invalid buffer {} in buffer saved message",
391 message.buffer_id
392 ))
393 }
394 }
395}
396
397impl Deref for Worktree {
398 type Target = Snapshot;
399
400 fn deref(&self) -> &Self::Target {
401 match self {
402 Worktree::Local(worktree) => &worktree.snapshot,
403 Worktree::Remote(worktree) => &worktree.snapshot,
404 }
405 }
406}
407
408pub struct LocalWorktree {
409 snapshot: Snapshot,
410 background_snapshot: Arc<Mutex<Snapshot>>,
411 snapshots_to_send_tx: Option<Sender<Snapshot>>,
412 scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
413 _event_stream_handle: fsevent::Handle,
414 poll_scheduled: bool,
415 rpc: Option<(rpc::Client, u64)>,
416 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
417 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
418 peers: HashMap<PeerId, ReplicaId>,
419 languages: Arc<LanguageRegistry>,
420}
421
422impl LocalWorktree {
423 fn new(
424 path: impl Into<Arc<Path>>,
425 languages: Arc<LanguageRegistry>,
426 cx: &mut ModelContext<Worktree>,
427 ) -> Self {
428 let abs_path = path.into();
429 let (scan_state_tx, scan_state_rx) = smol::channel::unbounded();
430 let id = cx.model_id();
431 let snapshot = Snapshot {
432 id,
433 scan_id: 0,
434 abs_path,
435 root_name: Default::default(),
436 root_char_bag: Default::default(),
437 ignores: Default::default(),
438 entries: Default::default(),
439 paths_by_id: Default::default(),
440 removed_entry_ids: Default::default(),
441 next_entry_id: Default::default(),
442 };
443 let (event_stream, event_stream_handle) =
444 fsevent::EventStream::new(&[snapshot.abs_path.as_ref()], Duration::from_millis(100));
445
446 let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
447
448 let tree = Self {
449 snapshot,
450 background_snapshot: background_snapshot.clone(),
451 snapshots_to_send_tx: None,
452 scan_state: watch::channel_with(ScanState::Scanning),
453 _event_stream_handle: event_stream_handle,
454 poll_scheduled: false,
455 open_buffers: Default::default(),
456 shared_buffers: Default::default(),
457 peers: Default::default(),
458 rpc: None,
459 languages,
460 };
461
462 std::thread::spawn(move || {
463 let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id);
464 scanner.run(event_stream)
465 });
466
467 cx.spawn_weak(|this, mut cx| async move {
468 while let Ok(scan_state) = scan_state_rx.recv().await {
469 if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
470 handle.update(&mut cx, |this, cx| {
471 this.as_local_mut()
472 .unwrap()
473 .observe_scan_state(scan_state, cx)
474 });
475 } else {
476 break;
477 }
478 }
479 })
480 .detach();
481
482 tree
483 }
484
485 pub fn open_buffer(
486 &mut self,
487 path: &Path,
488 cx: &mut ModelContext<Worktree>,
489 ) -> Task<Result<ModelHandle<Buffer>>> {
490 let handle = cx.handle();
491
492 // If there is already a buffer for the given path, then return it.
493 let mut existing_buffer = None;
494 self.open_buffers.retain(|_buffer_id, buffer| {
495 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
496 if let Some(file) = buffer.read(cx.as_ref()).file() {
497 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
498 existing_buffer = Some(buffer);
499 }
500 }
501 true
502 } else {
503 false
504 }
505 });
506
507 let languages = self.languages.clone();
508 let path = Arc::from(path);
509 cx.spawn(|this, mut cx| async move {
510 if let Some(existing_buffer) = existing_buffer {
511 Ok(existing_buffer)
512 } else {
513 let (file, contents) = this
514 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
515 .await?;
516 let language = languages.select_language(&path).cloned();
517 let buffer = cx.add_model(|cx| {
518 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
519 });
520 this.update(&mut cx, |this, _| {
521 let this = this
522 .as_local_mut()
523 .ok_or_else(|| anyhow!("must be a local worktree"))?;
524 this.open_buffers.insert(buffer.id(), buffer.downgrade());
525 Ok(buffer)
526 })
527 }
528 })
529 }
530
531 pub fn open_remote_buffer(
532 &mut self,
533 envelope: TypedEnvelope<proto::OpenBuffer>,
534 cx: &mut ModelContext<Worktree>,
535 ) -> Task<Result<proto::OpenBufferResponse>> {
536 let peer_id = envelope.original_sender_id();
537 let path = Path::new(&envelope.payload.path);
538
539 let buffer = self.open_buffer(path, cx);
540
541 cx.spawn(|this, mut cx| async move {
542 let buffer = buffer.await?;
543 this.update(&mut cx, |this, cx| {
544 this.as_local_mut()
545 .unwrap()
546 .shared_buffers
547 .entry(peer_id?)
548 .or_default()
549 .insert(buffer.id() as u64, buffer.clone());
550
551 Ok(proto::OpenBufferResponse {
552 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
553 })
554 })
555 })
556 }
557
558 pub fn close_remote_buffer(
559 &mut self,
560 envelope: TypedEnvelope<proto::CloseBuffer>,
561 _: &mut ModelContext<Worktree>,
562 ) -> Result<()> {
563 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
564 shared_buffers.remove(&envelope.payload.buffer_id);
565 }
566
567 Ok(())
568 }
569
570 pub fn add_peer(
571 &mut self,
572 envelope: TypedEnvelope<proto::AddPeer>,
573 cx: &mut ModelContext<Worktree>,
574 ) -> Result<()> {
575 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
576 self.peers
577 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
578 cx.notify();
579 Ok(())
580 }
581
582 pub fn remove_peer(
583 &mut self,
584 envelope: TypedEnvelope<proto::RemovePeer>,
585 cx: &mut ModelContext<Worktree>,
586 ) -> Result<()> {
587 let peer_id = PeerId(envelope.payload.peer_id);
588 let replica_id = self
589 .peers
590 .remove(&peer_id)
591 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
592 self.shared_buffers.remove(&peer_id);
593 for (_, buffer) in &self.open_buffers {
594 if let Some(buffer) = buffer.upgrade(&cx) {
595 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
596 }
597 }
598 cx.notify();
599 Ok(())
600 }
601
602 pub fn scan_complete(&self) -> impl Future<Output = ()> {
603 let mut scan_state_rx = self.scan_state.1.clone();
604 async move {
605 let mut scan_state = Some(scan_state_rx.borrow().clone());
606 while let Some(ScanState::Scanning) = scan_state {
607 scan_state = scan_state_rx.recv().await;
608 }
609 }
610 }
611
612 fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
613 self.scan_state.0.blocking_send(scan_state).ok();
614 self.poll_snapshot(cx);
615 if !self.is_scanning() {
616 if let Some(snapshots_to_send_tx) = self.snapshots_to_send_tx.clone() {
617 if let Err(err) = smol::block_on(snapshots_to_send_tx.send(self.snapshot())) {
618 log::error!("error submitting snapshot to send {}", err);
619 }
620 }
621 }
622 }
623
624 fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
625 self.snapshot = self.background_snapshot.lock().clone();
626 if self.is_scanning() {
627 if !self.poll_scheduled {
628 cx.spawn(|this, mut cx| async move {
629 smol::Timer::after(Duration::from_millis(100)).await;
630 this.update(&mut cx, |this, cx| {
631 let worktree = this.as_local_mut().unwrap();
632 worktree.poll_scheduled = false;
633 worktree.poll_snapshot(cx);
634 })
635 })
636 .detach();
637 self.poll_scheduled = true;
638 }
639 } else {
640 let mut buffers_to_delete = Vec::new();
641 for (buffer_id, buffer) in &self.open_buffers {
642 if let Some(buffer) = buffer.upgrade(&cx) {
643 buffer.update(cx, |buffer, cx| {
644 let buffer_is_clean = !buffer.is_dirty();
645
646 if let Some(file) = buffer.file_mut() {
647 let mut file_changed = false;
648
649 if let Some(entry) = file
650 .entry_id
651 .and_then(|entry_id| self.entry_for_id(entry_id))
652 {
653 if entry.path != file.path {
654 file.path = entry.path.clone();
655 file_changed = true;
656 }
657
658 if entry.mtime != file.mtime {
659 file.mtime = entry.mtime;
660 file_changed = true;
661 if buffer_is_clean {
662 let abs_path = self.absolutize(&file.path);
663 refresh_buffer(abs_path, cx);
664 }
665 }
666 } else if let Some(entry) = self.entry_for_path(&file.path) {
667 file.entry_id = Some(entry.id);
668 file.mtime = entry.mtime;
669 if buffer_is_clean {
670 let abs_path = self.absolutize(&file.path);
671 refresh_buffer(abs_path, cx);
672 }
673 file_changed = true;
674 } else if !file.is_deleted() {
675 if buffer_is_clean {
676 cx.emit(editor::buffer::Event::Dirtied);
677 }
678 file.entry_id = None;
679 file_changed = true;
680 }
681
682 if file_changed {
683 cx.emit(editor::buffer::Event::FileHandleChanged);
684 }
685 }
686 });
687 } else {
688 buffers_to_delete.push(*buffer_id);
689 }
690 }
691
692 for buffer_id in buffers_to_delete {
693 self.open_buffers.remove(&buffer_id);
694 }
695 }
696
697 cx.notify();
698 }
699
700 fn is_scanning(&self) -> bool {
701 if let ScanState::Scanning = *self.scan_state.1.borrow() {
702 true
703 } else {
704 false
705 }
706 }
707
708 pub fn snapshot(&self) -> Snapshot {
709 self.snapshot.clone()
710 }
711
712 pub fn abs_path(&self) -> &Path {
713 self.snapshot.abs_path.as_ref()
714 }
715
716 pub fn contains_abs_path(&self, path: &Path) -> bool {
717 path.starts_with(&self.snapshot.abs_path)
718 }
719
720 fn absolutize(&self, path: &Path) -> PathBuf {
721 if path.file_name().is_some() {
722 self.snapshot.abs_path.join(path)
723 } else {
724 self.snapshot.abs_path.to_path_buf()
725 }
726 }
727
728 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
729 let handle = cx.handle();
730 let path = Arc::from(path);
731 let abs_path = self.absolutize(&path);
732 let background_snapshot = self.background_snapshot.clone();
733 cx.spawn(|this, mut cx| async move {
734 let mut file = smol::fs::File::open(&abs_path).await?;
735 let mut text = String::new();
736 file.read_to_string(&mut text).await?;
737 // Eagerly populate the snapshot with an updated entry for the loaded file
738 let entry = refresh_entry(&background_snapshot, path, &abs_path)?;
739 this.update(&mut cx, |this, cx| {
740 let this = this.as_local_mut().unwrap();
741 this.poll_snapshot(cx);
742 });
743 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
744 })
745 }
746
747 pub fn save_buffer_as(
748 &self,
749 buffer: ModelHandle<Buffer>,
750 path: impl Into<Arc<Path>>,
751 text: Rope,
752 cx: &mut ModelContext<Worktree>,
753 ) -> Task<Result<File>> {
754 let save = self.save(path, text, cx);
755 cx.spawn(|this, mut cx| async move {
756 let entry = save.await?;
757 this.update(&mut cx, |this, cx| {
758 this.as_local_mut()
759 .unwrap()
760 .open_buffers
761 .insert(buffer.id(), buffer.downgrade());
762 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
763 })
764 })
765 }
766
767 fn save(
768 &self,
769 path: impl Into<Arc<Path>>,
770 text: Rope,
771 cx: &mut ModelContext<Worktree>,
772 ) -> Task<Result<Entry>> {
773 let path = path.into();
774 let abs_path = self.absolutize(&path);
775 let background_snapshot = self.background_snapshot.clone();
776
777 let save = cx.background().spawn(async move {
778 let buffer_size = text.summary().bytes.min(10 * 1024);
779 let file = smol::fs::File::create(&abs_path).await?;
780 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
781 for chunk in text.chunks() {
782 writer.write_all(chunk.as_bytes()).await?;
783 }
784 writer.flush().await?;
785 refresh_entry(&background_snapshot, path.clone(), &abs_path)
786 });
787
788 cx.spawn(|this, mut cx| async move {
789 let entry = save.await?;
790 this.update(&mut cx, |this, cx| {
791 this.as_local_mut().unwrap().poll_snapshot(cx);
792 });
793 Ok(entry)
794 })
795 }
796
797 pub fn share(
798 &mut self,
799 rpc: rpc::Client,
800 cx: &mut ModelContext<Worktree>,
801 ) -> Task<anyhow::Result<(u64, String)>> {
802 let snapshot = self.snapshot();
803 let share_request = self.share_request(cx);
804 let handle = cx.handle();
805 cx.spawn(|this, mut cx| async move {
806 let share_request = share_request.await;
807 let share_response = rpc.request(share_request).await?;
808
809 rpc.state
810 .write()
811 .await
812 .shared_worktrees
813 .insert(share_response.worktree_id, handle.downgrade());
814
815 log::info!("sharing worktree {:?}", share_response);
816 let (snapshots_to_send_tx, snapshots_to_send_rx) =
817 smol::channel::unbounded::<Snapshot>();
818
819 cx.background()
820 .spawn({
821 let rpc = rpc.clone();
822 let worktree_id = share_response.worktree_id;
823 async move {
824 let mut prev_snapshot = snapshot;
825 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
826 let message = snapshot.build_update(&prev_snapshot, worktree_id);
827 match rpc.send(message).await {
828 Ok(()) => prev_snapshot = snapshot,
829 Err(err) => log::error!("error sending snapshot diff {}", err),
830 }
831 }
832 }
833 })
834 .detach();
835
836 this.update(&mut cx, |worktree, _| {
837 let worktree = worktree.as_local_mut().unwrap();
838 worktree.rpc = Some((rpc, share_response.worktree_id));
839 worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
840 });
841
842 Ok((share_response.worktree_id, share_response.access_token))
843 })
844 }
845
846 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
847 let snapshot = self.snapshot();
848 let root_name = self.root_name.clone();
849 cx.background().spawn(async move {
850 let entries = snapshot
851 .entries
852 .cursor::<(), ()>()
853 .map(Into::into)
854 .collect();
855 proto::ShareWorktree {
856 worktree: Some(proto::Worktree { root_name, entries }),
857 }
858 })
859 }
860}
861
862pub fn refresh_buffer(abs_path: PathBuf, cx: &mut ModelContext<Buffer>) {
863 cx.spawn(|buffer, mut cx| async move {
864 let new_text = cx
865 .background()
866 .spawn(async move {
867 let mut file = smol::fs::File::open(&abs_path).await?;
868 let mut text = String::new();
869 file.read_to_string(&mut text).await?;
870 Ok::<_, anyhow::Error>(text.into())
871 })
872 .await;
873
874 match new_text {
875 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
876 Ok(new_text) => {
877 buffer
878 .update(&mut cx, |buffer, cx| {
879 buffer.set_text_from_disk(new_text, cx)
880 })
881 .await;
882 }
883 }
884 })
885 .detach()
886}
887
888impl Deref for LocalWorktree {
889 type Target = Snapshot;
890
891 fn deref(&self) -> &Self::Target {
892 &self.snapshot
893 }
894}
895
896impl fmt::Debug for LocalWorktree {
897 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898 self.snapshot.fmt(f)
899 }
900}
901
902pub struct RemoteWorktree {
903 remote_id: u64,
904 snapshot: Snapshot,
905 rpc: rpc::Client,
906 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
907 replica_id: ReplicaId,
908 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
909 peers: HashMap<PeerId, ReplicaId>,
910 languages: Arc<LanguageRegistry>,
911}
912
913impl RemoteWorktree {
914 pub fn open_buffer(
915 &mut self,
916 path: &Path,
917 cx: &mut ModelContext<Worktree>,
918 ) -> Task<Result<ModelHandle<Buffer>>> {
919 let handle = cx.handle();
920 let mut existing_buffer = None;
921 self.open_buffers.retain(|_buffer_id, buffer| {
922 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
923 if let Some(file) = buffer.read(cx.as_ref()).file() {
924 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
925 existing_buffer = Some(buffer);
926 }
927 }
928 true
929 } else {
930 false
931 }
932 });
933
934 let rpc = self.rpc.clone();
935 let languages = self.languages.clone();
936 let replica_id = self.replica_id;
937 let remote_worktree_id = self.remote_id;
938 let path = path.to_string_lossy().to_string();
939 cx.spawn(|this, mut cx| async move {
940 if let Some(existing_buffer) = existing_buffer {
941 Ok(existing_buffer)
942 } else {
943 let entry = this
944 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
945 .ok_or_else(|| anyhow!("file does not exist"))?;
946 let file = File::new(entry.id, handle, entry.path, entry.mtime);
947 let language = languages.select_language(&path).cloned();
948 let response = rpc
949 .request(proto::OpenBuffer {
950 worktree_id: remote_worktree_id as u64,
951 path,
952 })
953 .await?;
954 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
955 let buffer_id = remote_buffer.id;
956 let buffer = cx.add_model(|cx| {
957 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
958 });
959 this.update(&mut cx, |this, _| {
960 let this = this.as_remote_mut().unwrap();
961 this.open_buffers
962 .insert(buffer_id as usize, buffer.downgrade());
963 });
964 Ok(buffer)
965 }
966 })
967 }
968
969 fn snapshot(&self) -> Snapshot {
970 self.snapshot.clone()
971 }
972
973 pub fn add_peer(
974 &mut self,
975 envelope: TypedEnvelope<proto::AddPeer>,
976 cx: &mut ModelContext<Worktree>,
977 ) -> Result<()> {
978 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
979 self.peers
980 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
981 cx.notify();
982 Ok(())
983 }
984
985 pub fn remove_peer(
986 &mut self,
987 envelope: TypedEnvelope<proto::RemovePeer>,
988 cx: &mut ModelContext<Worktree>,
989 ) -> Result<()> {
990 let peer_id = PeerId(envelope.payload.peer_id);
991 let replica_id = self
992 .peers
993 .remove(&peer_id)
994 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
995 for (_, buffer) in &self.open_buffers {
996 if let Some(buffer) = buffer.upgrade(&cx) {
997 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
998 }
999 }
1000 cx.notify();
1001 Ok(())
1002 }
1003
1004 fn update_open_buffers(&mut self, cx: &mut ModelContext<Worktree>) {
1005 let mut buffers_to_delete = Vec::new();
1006 for (buffer_id, buffer) in &self.open_buffers {
1007 if let Some(buffer) = buffer.upgrade(&cx) {
1008 buffer.update(cx, |buffer, cx| {
1009 let buffer_is_clean = !buffer.is_dirty();
1010
1011 if let Some(file) = buffer.file_mut() {
1012 let mut file_changed = false;
1013
1014 if let Some(entry) = file
1015 .entry_id
1016 .and_then(|entry_id| self.snapshot.entry_for_id(entry_id))
1017 {
1018 if entry.path != file.path {
1019 file.path = entry.path.clone();
1020 file_changed = true;
1021 }
1022
1023 if entry.mtime != file.mtime {
1024 file.mtime = entry.mtime;
1025 file_changed = true;
1026 }
1027 } else if let Some(entry) = self.snapshot.entry_for_path(&file.path) {
1028 file.entry_id = Some(entry.id);
1029 file.mtime = entry.mtime;
1030 file_changed = true;
1031 } else if !file.is_deleted() {
1032 if buffer_is_clean {
1033 cx.emit(editor::buffer::Event::Dirtied);
1034 }
1035 file.entry_id = None;
1036 file_changed = true;
1037 }
1038
1039 if file_changed {
1040 cx.emit(editor::buffer::Event::FileHandleChanged);
1041 }
1042 }
1043 });
1044 } else {
1045 buffers_to_delete.push(*buffer_id);
1046 }
1047 }
1048
1049 for buffer_id in buffers_to_delete {
1050 self.open_buffers.remove(&buffer_id);
1051 }
1052 }
1053}
1054
1055#[derive(Clone)]
1056pub struct Snapshot {
1057 id: usize,
1058 scan_id: usize,
1059 abs_path: Arc<Path>,
1060 root_name: String,
1061 root_char_bag: CharBag,
1062 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1063 entries: SumTree<Entry>,
1064 paths_by_id: rpds::RedBlackTreeMapSync<usize, (Arc<Path>, usize)>,
1065 removed_entry_ids: HashMap<u64, usize>,
1066 next_entry_id: Arc<AtomicUsize>,
1067}
1068
1069impl Snapshot {
1070 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1071 let mut updated_entries = Vec::new();
1072 let mut removed_entries = Vec::new();
1073 let mut self_entries = self.paths_by_id.iter().peekable();
1074 let mut other_entries = other.paths_by_id.iter().peekable();
1075 loop {
1076 match (self_entries.peek(), other_entries.peek()) {
1077 (
1078 Some((self_entry_id, (_, self_scan_id))),
1079 Some((other_entry_id, (_, other_scan_id))),
1080 ) => match self_entry_id.cmp(other_entry_id) {
1081 Ordering::Less => {
1082 let entry = self.entry_for_id(**self_entry_id).unwrap().into();
1083 updated_entries.push(entry);
1084 self_entries.next();
1085 }
1086 Ordering::Equal => {
1087 if self_scan_id != other_scan_id {
1088 let entry = self.entry_for_id(**self_entry_id).unwrap().into();
1089 updated_entries.push(entry);
1090 }
1091
1092 self_entries.next();
1093 other_entries.next();
1094 }
1095 Ordering::Greater => {
1096 removed_entries.push(**other_entry_id as u64);
1097 other_entries.next();
1098 }
1099 },
1100 (Some((self_entry_id, _)), None) => {
1101 let entry = self.entry_for_id(**self_entry_id).unwrap().into();
1102 updated_entries.push(entry);
1103 self_entries.next();
1104 }
1105 (None, Some((other_entry_id, _))) => {
1106 removed_entries.push(**other_entry_id as u64);
1107 other_entries.next();
1108 }
1109 (None, None) => break,
1110 }
1111 }
1112
1113 proto::UpdateWorktree {
1114 updated_entries,
1115 removed_entries,
1116 worktree_id,
1117 }
1118 }
1119
1120 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1121 self.scan_id += 1;
1122 let scan_id = self.scan_id;
1123
1124 let mut edits = Vec::new();
1125 for entry_id in update.removed_entries {
1126 let entry_id = entry_id as usize;
1127 let entry = self
1128 .entry_for_id(entry_id)
1129 .ok_or_else(|| anyhow!("unknown entry"))?;
1130 edits.push(Edit::Remove(PathKey(entry.path.clone())));
1131 self.paths_by_id.remove_mut(&entry_id);
1132 }
1133
1134 for entry in update.updated_entries {
1135 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1136 if let Some((path, _)) = self.paths_by_id.get(&entry.id) {
1137 edits.push(Edit::Remove(PathKey(path.clone())));
1138 }
1139 self.paths_by_id
1140 .insert_mut(entry.id, (entry.path.clone(), scan_id));
1141 edits.push(Edit::Insert(entry));
1142 }
1143 self.entries.edit(edits, &());
1144
1145 Ok(())
1146 }
1147
1148 pub fn file_count(&self) -> usize {
1149 self.entries.summary().file_count
1150 }
1151
1152 pub fn visible_file_count(&self) -> usize {
1153 self.entries.summary().visible_file_count
1154 }
1155
1156 pub fn files(&self, start: usize) -> FileIter {
1157 FileIter::all(self, start)
1158 }
1159
1160 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1161 let empty_path = Path::new("");
1162 self.entries
1163 .cursor::<(), ()>()
1164 .filter(move |entry| entry.path.as_ref() != empty_path)
1165 .map(|entry| entry.path())
1166 }
1167
1168 pub fn visible_files(&self, start: usize) -> FileIter {
1169 FileIter::visible(self, start)
1170 }
1171
1172 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1173 ChildEntriesIter::new(path, self)
1174 }
1175
1176 pub fn root_entry(&self) -> &Entry {
1177 self.entry_for_path("").unwrap()
1178 }
1179
1180 /// Returns the filename of the snapshot's root, plus a trailing slash if the snapshot's root is
1181 /// a directory.
1182 pub fn root_name(&self) -> &str {
1183 &self.root_name
1184 }
1185
1186 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1187 let mut cursor = self.entries.cursor::<_, ()>();
1188 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1189 cursor.item()
1190 } else {
1191 None
1192 }
1193 }
1194
1195 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1196 let (path, _) = self.paths_by_id.get(&id)?;
1197 self.entry_for_path(path)
1198 }
1199
1200 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1201 self.entry_for_path(path.as_ref()).map(|e| e.inode())
1202 }
1203
1204 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1205 if !entry.is_dir() && entry.path().file_name() == Some(&GITIGNORE) {
1206 let (ignore, err) = Gitignore::new(self.abs_path.join(entry.path()));
1207 if let Some(err) = err {
1208 log::error!("error in ignore file {:?} - {:?}", entry.path(), err);
1209 }
1210
1211 let ignore_dir_path = entry.path().parent().unwrap();
1212 self.ignores
1213 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1214 }
1215
1216 self.reuse_entry_id(&mut entry);
1217 self.entries.insert_or_replace(entry.clone(), &());
1218 self.paths_by_id
1219 .insert_mut(entry.id, (entry.path.clone(), self.scan_id));
1220 entry
1221 }
1222
1223 fn populate_dir(
1224 &mut self,
1225 parent_path: Arc<Path>,
1226 entries: impl IntoIterator<Item = Entry>,
1227 ignore: Option<Arc<Gitignore>>,
1228 ) {
1229 let mut edits = Vec::new();
1230
1231 let mut parent_entry = self
1232 .entries
1233 .get(&PathKey(parent_path.clone()), &())
1234 .unwrap()
1235 .clone();
1236 if let Some(ignore) = ignore {
1237 self.ignores.insert(parent_path, (ignore, self.scan_id));
1238 }
1239 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1240 parent_entry.kind = EntryKind::Dir;
1241 } else {
1242 unreachable!();
1243 }
1244 edits.push(Edit::Insert(parent_entry));
1245
1246 for mut entry in entries {
1247 self.reuse_entry_id(&mut entry);
1248 self.paths_by_id
1249 .insert_mut(entry.id, (entry.path.clone(), self.scan_id));
1250 edits.push(Edit::Insert(entry));
1251 }
1252 self.entries.edit(edits, &());
1253 }
1254
1255 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1256 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1257 entry.id = removed_entry_id;
1258 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1259 entry.id = existing_entry.id;
1260 }
1261 }
1262
1263 fn remove_path(&mut self, path: &Path) {
1264 let mut new_entries;
1265 let removed_entry_ids;
1266 {
1267 let mut cursor = self.entries.cursor::<_, ()>();
1268 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1269 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1270 new_entries.push_tree(cursor.suffix(&()), &());
1271 }
1272 self.entries = new_entries;
1273 for entry in removed_entry_ids.cursor::<(), ()>() {
1274 let removed_entry_id = self
1275 .removed_entry_ids
1276 .entry(entry.inode)
1277 .or_insert(entry.id);
1278 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1279 self.paths_by_id.remove_mut(&entry.id);
1280 }
1281
1282 if path.file_name() == Some(&GITIGNORE) {
1283 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1284 *scan_id = self.scan_id;
1285 }
1286 }
1287 }
1288
1289 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1290 let mut new_ignores = Vec::new();
1291 for ancestor in path.ancestors().skip(1) {
1292 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1293 new_ignores.push((ancestor, Some(ignore.clone())));
1294 } else {
1295 new_ignores.push((ancestor, None));
1296 }
1297 }
1298
1299 let mut ignore_stack = IgnoreStack::none();
1300 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1301 if ignore_stack.is_path_ignored(&parent_path, true) {
1302 ignore_stack = IgnoreStack::all();
1303 break;
1304 } else if let Some(ignore) = ignore {
1305 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1306 }
1307 }
1308
1309 if ignore_stack.is_path_ignored(path, is_dir) {
1310 ignore_stack = IgnoreStack::all();
1311 }
1312
1313 ignore_stack
1314 }
1315}
1316
1317impl fmt::Debug for Snapshot {
1318 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1319 for entry in self.entries.cursor::<(), ()>() {
1320 for _ in entry.path().ancestors().skip(1) {
1321 write!(f, " ")?;
1322 }
1323 writeln!(f, "{:?} (inode: {})", entry.path(), entry.inode())?;
1324 }
1325 Ok(())
1326 }
1327}
1328
1329#[derive(Clone, PartialEq)]
1330pub struct File {
1331 entry_id: Option<usize>,
1332 worktree: ModelHandle<Worktree>,
1333 pub path: Arc<Path>,
1334 pub mtime: SystemTime,
1335}
1336
1337impl File {
1338 pub fn new(
1339 entry_id: usize,
1340 worktree: ModelHandle<Worktree>,
1341 path: Arc<Path>,
1342 mtime: SystemTime,
1343 ) -> Self {
1344 Self {
1345 entry_id: Some(entry_id),
1346 worktree,
1347 path,
1348 mtime,
1349 }
1350 }
1351
1352 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1353 self.worktree.update(cx, |worktree, cx| {
1354 if let Some((rpc, remote_id)) = match worktree {
1355 Worktree::Local(worktree) => worktree.rpc.clone(),
1356 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1357 } {
1358 cx.background()
1359 .spawn(async move {
1360 if let Err(error) = rpc
1361 .send(proto::UpdateBuffer {
1362 worktree_id: remote_id,
1363 buffer_id,
1364 operations: Some(operation).iter().map(Into::into).collect(),
1365 })
1366 .await
1367 {
1368 log::error!("error sending buffer operation: {}", error);
1369 }
1370 })
1371 .detach();
1372 }
1373 });
1374 }
1375
1376 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1377 self.worktree.update(cx, |worktree, cx| {
1378 if let Worktree::Remote(worktree) = worktree {
1379 let worktree_id = worktree.remote_id;
1380 let rpc = worktree.rpc.clone();
1381 cx.background()
1382 .spawn(async move {
1383 if let Err(error) = rpc
1384 .send(proto::CloseBuffer {
1385 worktree_id,
1386 buffer_id,
1387 })
1388 .await
1389 {
1390 log::error!("error closing remote buffer: {}", error);
1391 };
1392 })
1393 .detach();
1394 }
1395 });
1396 }
1397
1398 /// Returns this file's path relative to the root of its worktree.
1399 pub fn path(&self) -> Arc<Path> {
1400 self.path.clone()
1401 }
1402
1403 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1404 self.worktree.read(cx).abs_path.join(&self.path)
1405 }
1406
1407 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1408 /// of its worktree, then this method will return the name of the worktree itself.
1409 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1410 self.path
1411 .file_name()
1412 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1413 .map(Into::into)
1414 }
1415
1416 pub fn is_deleted(&self) -> bool {
1417 self.entry_id.is_none()
1418 }
1419
1420 pub fn exists(&self) -> bool {
1421 !self.is_deleted()
1422 }
1423
1424 pub fn save(
1425 &self,
1426 buffer_id: u64,
1427 text: Rope,
1428 version: time::Global,
1429 cx: &mut MutableAppContext,
1430 ) -> Task<Result<time::Global>> {
1431 self.worktree.update(cx, |worktree, cx| match worktree {
1432 Worktree::Local(worktree) => {
1433 let rpc = worktree.rpc.clone();
1434 let save = worktree.save(self.path.clone(), text, cx);
1435 cx.spawn(|_, _| async move {
1436 save.await?;
1437 if let Some((rpc, worktree_id)) = rpc {
1438 rpc.send(proto::BufferSaved {
1439 worktree_id,
1440 buffer_id,
1441 version: (&version).into(),
1442 })
1443 .await?;
1444 }
1445 Ok(version)
1446 })
1447 }
1448 Worktree::Remote(worktree) => {
1449 let rpc = worktree.rpc.clone();
1450 let worktree_id = worktree.remote_id;
1451 cx.spawn(|_, _| async move {
1452 let response = rpc
1453 .request(proto::SaveBuffer {
1454 worktree_id,
1455 buffer_id,
1456 })
1457 .await?;
1458 Ok(response.version.try_into()?)
1459 })
1460 }
1461 })
1462 }
1463
1464 pub fn worktree_id(&self) -> usize {
1465 self.worktree.id()
1466 }
1467
1468 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1469 (self.worktree.id(), self.path())
1470 }
1471}
1472
1473#[derive(Clone, Debug)]
1474pub struct Entry {
1475 id: usize,
1476 kind: EntryKind,
1477 path: Arc<Path>,
1478 inode: u64,
1479 mtime: SystemTime,
1480 is_symlink: bool,
1481 is_ignored: bool,
1482}
1483
1484#[derive(Clone, Debug)]
1485pub enum EntryKind {
1486 PendingDir,
1487 Dir,
1488 File(CharBag),
1489}
1490
1491impl Entry {
1492 pub fn path(&self) -> &Arc<Path> {
1493 &self.path
1494 }
1495
1496 pub fn inode(&self) -> u64 {
1497 self.inode
1498 }
1499
1500 pub fn is_ignored(&self) -> bool {
1501 self.is_ignored
1502 }
1503
1504 fn is_dir(&self) -> bool {
1505 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1506 }
1507
1508 fn is_file(&self) -> bool {
1509 matches!(self.kind, EntryKind::File(_))
1510 }
1511}
1512
1513impl sum_tree::Item for Entry {
1514 type Summary = EntrySummary;
1515
1516 fn summary(&self) -> Self::Summary {
1517 let file_count;
1518 let visible_file_count;
1519 if self.is_file() {
1520 file_count = 1;
1521 if self.is_ignored {
1522 visible_file_count = 0;
1523 } else {
1524 visible_file_count = 1;
1525 }
1526 } else {
1527 file_count = 0;
1528 visible_file_count = 0;
1529 }
1530
1531 EntrySummary {
1532 max_path: self.path().clone(),
1533 file_count,
1534 visible_file_count,
1535 }
1536 }
1537}
1538
1539impl sum_tree::KeyedItem for Entry {
1540 type Key = PathKey;
1541
1542 fn key(&self) -> Self::Key {
1543 PathKey(self.path().clone())
1544 }
1545}
1546
1547#[derive(Clone, Debug)]
1548pub struct EntrySummary {
1549 max_path: Arc<Path>,
1550 file_count: usize,
1551 visible_file_count: usize,
1552}
1553
1554impl Default for EntrySummary {
1555 fn default() -> Self {
1556 Self {
1557 max_path: Arc::from(Path::new("")),
1558 file_count: 0,
1559 visible_file_count: 0,
1560 }
1561 }
1562}
1563
1564impl sum_tree::Summary for EntrySummary {
1565 type Context = ();
1566
1567 fn add_summary(&mut self, rhs: &Self, _: &()) {
1568 self.max_path = rhs.max_path.clone();
1569 self.file_count += rhs.file_count;
1570 self.visible_file_count += rhs.visible_file_count;
1571 }
1572}
1573
1574#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1575pub struct PathKey(Arc<Path>);
1576
1577impl Default for PathKey {
1578 fn default() -> Self {
1579 Self(Path::new("").into())
1580 }
1581}
1582
1583impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1584 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1585 self.0 = summary.max_path.clone();
1586 }
1587}
1588
1589#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1590enum PathSearch<'a> {
1591 Exact(&'a Path),
1592 Successor(&'a Path),
1593}
1594
1595impl<'a> Ord for PathSearch<'a> {
1596 fn cmp(&self, other: &Self) -> cmp::Ordering {
1597 match (self, other) {
1598 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1599 (Self::Successor(a), Self::Exact(b)) => {
1600 if b.starts_with(a) {
1601 cmp::Ordering::Greater
1602 } else {
1603 a.cmp(b)
1604 }
1605 }
1606 _ => unreachable!("not sure we need the other two cases"),
1607 }
1608 }
1609}
1610
1611impl<'a> PartialOrd for PathSearch<'a> {
1612 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1613 Some(self.cmp(other))
1614 }
1615}
1616
1617impl<'a> Default for PathSearch<'a> {
1618 fn default() -> Self {
1619 Self::Exact(Path::new("").into())
1620 }
1621}
1622
1623impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1624 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1625 *self = Self::Exact(summary.max_path.as_ref());
1626 }
1627}
1628
1629#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1630pub struct FileCount(usize);
1631
1632impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1633 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1634 self.0 += summary.file_count;
1635 }
1636}
1637
1638#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1639pub struct VisibleFileCount(usize);
1640
1641impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1642 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1643 self.0 += summary.visible_file_count;
1644 }
1645}
1646
1647struct BackgroundScanner {
1648 snapshot: Arc<Mutex<Snapshot>>,
1649 notify: Sender<ScanState>,
1650 thread_pool: scoped_pool::Pool,
1651}
1652
1653impl BackgroundScanner {
1654 fn new(snapshot: Arc<Mutex<Snapshot>>, notify: Sender<ScanState>, worktree_id: usize) -> Self {
1655 Self {
1656 snapshot,
1657 notify,
1658 thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)),
1659 }
1660 }
1661
1662 fn abs_path(&self) -> Arc<Path> {
1663 self.snapshot.lock().abs_path.clone()
1664 }
1665
1666 fn snapshot(&self) -> Snapshot {
1667 self.snapshot.lock().clone()
1668 }
1669
1670 fn run(mut self, event_stream: fsevent::EventStream) {
1671 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
1672 return;
1673 }
1674
1675 if let Err(err) = self.scan_dirs() {
1676 if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
1677 return;
1678 }
1679 }
1680
1681 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
1682 return;
1683 }
1684
1685 event_stream.run(move |events| {
1686 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
1687 return false;
1688 }
1689
1690 if !self.process_events(events) {
1691 return false;
1692 }
1693
1694 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
1695 return false;
1696 }
1697
1698 true
1699 });
1700 }
1701
1702 fn scan_dirs(&mut self) -> io::Result<()> {
1703 self.snapshot.lock().scan_id += 1;
1704
1705 let path: Arc<Path> = Arc::from(Path::new(""));
1706 let abs_path = self.abs_path();
1707 let metadata = fs::metadata(&abs_path)?;
1708 let inode = metadata.ino();
1709 let is_symlink = fs::symlink_metadata(&abs_path)?.file_type().is_symlink();
1710 let is_dir = metadata.file_type().is_dir();
1711 let mtime = metadata.modified()?;
1712
1713 // After determining whether the root entry is a file or a directory, populate the
1714 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
1715 let mut root_name = abs_path
1716 .file_name()
1717 .map_or(String::new(), |f| f.to_string_lossy().to_string());
1718 if is_dir {
1719 root_name.push('/');
1720 }
1721
1722 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
1723 let next_entry_id;
1724 {
1725 let mut snapshot = self.snapshot.lock();
1726 snapshot.root_name = root_name;
1727 snapshot.root_char_bag = root_char_bag;
1728 next_entry_id = snapshot.next_entry_id.clone();
1729 }
1730
1731 if is_dir {
1732 self.snapshot.lock().insert_entry(Entry {
1733 id: next_entry_id.fetch_add(1, SeqCst),
1734 kind: EntryKind::PendingDir,
1735 path: path.clone(),
1736 inode,
1737 mtime,
1738 is_symlink,
1739 is_ignored: false,
1740 });
1741
1742 let (tx, rx) = crossbeam_channel::unbounded();
1743 tx.send(ScanJob {
1744 abs_path: abs_path.to_path_buf(),
1745 path,
1746 ignore_stack: IgnoreStack::none(),
1747 scan_queue: tx.clone(),
1748 })
1749 .unwrap();
1750 drop(tx);
1751
1752 self.thread_pool.scoped(|pool| {
1753 for _ in 0..self.thread_pool.thread_count() {
1754 pool.execute(|| {
1755 while let Ok(job) = rx.recv() {
1756 if let Err(err) =
1757 self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
1758 {
1759 log::error!("error scanning {:?}: {}", job.abs_path, err);
1760 }
1761 }
1762 });
1763 }
1764 });
1765 } else {
1766 self.snapshot.lock().insert_entry(Entry {
1767 id: next_entry_id.fetch_add(1, SeqCst),
1768 kind: EntryKind::File(char_bag_for_path(root_char_bag, &path)),
1769 path,
1770 inode,
1771 mtime,
1772 is_symlink,
1773 is_ignored: false,
1774 });
1775 }
1776
1777 Ok(())
1778 }
1779
1780 fn scan_dir(
1781 &self,
1782 root_char_bag: CharBag,
1783 next_entry_id: Arc<AtomicUsize>,
1784 job: &ScanJob,
1785 ) -> io::Result<()> {
1786 let mut new_entries: Vec<Entry> = Vec::new();
1787 let mut new_jobs: Vec<ScanJob> = Vec::new();
1788 let mut ignore_stack = job.ignore_stack.clone();
1789 let mut new_ignore = None;
1790
1791 for child_entry in fs::read_dir(&job.abs_path)? {
1792 let child_entry = child_entry?;
1793 let child_name = child_entry.file_name();
1794 let child_abs_path = job.abs_path.join(&child_name);
1795 let child_path: Arc<Path> = job.path.join(&child_name).into();
1796 let child_is_symlink = child_entry.metadata()?.file_type().is_symlink();
1797 let child_metadata = if let Ok(metadata) = fs::metadata(&child_abs_path) {
1798 metadata
1799 } else {
1800 log::error!("could not get metadata for path {:?}", child_abs_path);
1801 continue;
1802 };
1803
1804 let child_inode = child_metadata.ino();
1805 let child_mtime = child_metadata.modified()?;
1806
1807 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
1808 if child_name == *GITIGNORE {
1809 let (ignore, err) = Gitignore::new(&child_abs_path);
1810 if let Some(err) = err {
1811 log::error!("error in ignore file {:?} - {:?}", child_path, err);
1812 }
1813 let ignore = Arc::new(ignore);
1814 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
1815 new_ignore = Some(ignore);
1816
1817 // Update ignore status of any child entries we've already processed to reflect the
1818 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
1819 // there should rarely be too numerous. Update the ignore stack associated with any
1820 // new jobs as well.
1821 let mut new_jobs = new_jobs.iter_mut();
1822 for entry in &mut new_entries {
1823 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
1824 if entry.is_dir() {
1825 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
1826 IgnoreStack::all()
1827 } else {
1828 ignore_stack.clone()
1829 };
1830 }
1831 }
1832 }
1833
1834 if child_metadata.is_dir() {
1835 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
1836 new_entries.push(Entry {
1837 id: next_entry_id.fetch_add(1, SeqCst),
1838 kind: EntryKind::PendingDir,
1839 path: child_path.clone(),
1840 inode: child_inode,
1841 mtime: child_mtime,
1842 is_symlink: child_is_symlink,
1843 is_ignored,
1844 });
1845 new_jobs.push(ScanJob {
1846 abs_path: child_abs_path,
1847 path: child_path,
1848 ignore_stack: if is_ignored {
1849 IgnoreStack::all()
1850 } else {
1851 ignore_stack.clone()
1852 },
1853 scan_queue: job.scan_queue.clone(),
1854 });
1855 } else {
1856 let is_ignored = ignore_stack.is_path_ignored(&child_path, false);
1857 new_entries.push(Entry {
1858 id: next_entry_id.fetch_add(1, SeqCst),
1859 kind: EntryKind::File(char_bag_for_path(root_char_bag, &child_path)),
1860 path: child_path,
1861 inode: child_inode,
1862 mtime: child_mtime,
1863 is_symlink: child_is_symlink,
1864 is_ignored,
1865 });
1866 };
1867 }
1868
1869 self.snapshot
1870 .lock()
1871 .populate_dir(job.path.clone(), new_entries, new_ignore);
1872 for new_job in new_jobs {
1873 job.scan_queue.send(new_job).unwrap();
1874 }
1875
1876 Ok(())
1877 }
1878
1879 fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
1880 let mut snapshot = self.snapshot();
1881 snapshot.scan_id += 1;
1882
1883 let root_abs_path = if let Ok(abs_path) = snapshot.abs_path.canonicalize() {
1884 abs_path
1885 } else {
1886 return false;
1887 };
1888 let root_char_bag = snapshot.root_char_bag;
1889 let next_entry_id = snapshot.next_entry_id.clone();
1890
1891 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
1892 events.dedup_by(|a, b| a.path.starts_with(&b.path));
1893
1894 for event in &events {
1895 match event.path.strip_prefix(&root_abs_path) {
1896 Ok(path) => snapshot.remove_path(&path),
1897 Err(_) => {
1898 log::error!(
1899 "unexpected event {:?} for root path {:?}",
1900 event.path,
1901 root_abs_path
1902 );
1903 continue;
1904 }
1905 }
1906 }
1907
1908 let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
1909 for event in events {
1910 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
1911 Ok(path) => Arc::from(path.to_path_buf()),
1912 Err(_) => {
1913 log::error!(
1914 "unexpected event {:?} for root path {:?}",
1915 event.path,
1916 root_abs_path
1917 );
1918 continue;
1919 }
1920 };
1921
1922 match fs_entry_for_path(
1923 snapshot.root_char_bag,
1924 &next_entry_id,
1925 path.clone(),
1926 &event.path,
1927 ) {
1928 Ok(Some(mut fs_entry)) => {
1929 let is_dir = fs_entry.is_dir();
1930 let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
1931 fs_entry.is_ignored = ignore_stack.is_all();
1932 snapshot.insert_entry(fs_entry);
1933 if is_dir {
1934 scan_queue_tx
1935 .send(ScanJob {
1936 abs_path: event.path,
1937 path,
1938 ignore_stack,
1939 scan_queue: scan_queue_tx.clone(),
1940 })
1941 .unwrap();
1942 }
1943 }
1944 Ok(None) => {}
1945 Err(err) => {
1946 // TODO - create a special 'error' entry in the entries tree to mark this
1947 log::error!("error reading file on event {:?}", err);
1948 }
1949 }
1950 }
1951
1952 *self.snapshot.lock() = snapshot;
1953
1954 // Scan any directories that were created as part of this event batch.
1955 drop(scan_queue_tx);
1956 self.thread_pool.scoped(|pool| {
1957 for _ in 0..self.thread_pool.thread_count() {
1958 pool.execute(|| {
1959 while let Ok(job) = scan_queue_rx.recv() {
1960 if let Err(err) = self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
1961 {
1962 log::error!("error scanning {:?}: {}", job.abs_path, err);
1963 }
1964 }
1965 });
1966 }
1967 });
1968
1969 // Attempt to detect renames only over a single batch of file-system events.
1970 self.snapshot.lock().removed_entry_ids.clear();
1971
1972 self.update_ignore_statuses();
1973 true
1974 }
1975
1976 fn update_ignore_statuses(&self) {
1977 let mut snapshot = self.snapshot();
1978
1979 let mut ignores_to_update = Vec::new();
1980 let mut ignores_to_delete = Vec::new();
1981 for (parent_path, (_, scan_id)) in &snapshot.ignores {
1982 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
1983 ignores_to_update.push(parent_path.clone());
1984 }
1985
1986 let ignore_path = parent_path.join(&*GITIGNORE);
1987 if snapshot.entry_for_path(ignore_path).is_none() {
1988 ignores_to_delete.push(parent_path.clone());
1989 }
1990 }
1991
1992 for parent_path in ignores_to_delete {
1993 snapshot.ignores.remove(&parent_path);
1994 self.snapshot.lock().ignores.remove(&parent_path);
1995 }
1996
1997 let (ignore_queue_tx, ignore_queue_rx) = crossbeam_channel::unbounded();
1998 ignores_to_update.sort_unstable();
1999 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2000 while let Some(parent_path) = ignores_to_update.next() {
2001 while ignores_to_update
2002 .peek()
2003 .map_or(false, |p| p.starts_with(&parent_path))
2004 {
2005 ignores_to_update.next().unwrap();
2006 }
2007
2008 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2009 ignore_queue_tx
2010 .send(UpdateIgnoreStatusJob {
2011 path: parent_path,
2012 ignore_stack,
2013 ignore_queue: ignore_queue_tx.clone(),
2014 })
2015 .unwrap();
2016 }
2017 drop(ignore_queue_tx);
2018
2019 self.thread_pool.scoped(|scope| {
2020 for _ in 0..self.thread_pool.thread_count() {
2021 scope.execute(|| {
2022 while let Ok(job) = ignore_queue_rx.recv() {
2023 self.update_ignore_status(job, &snapshot);
2024 }
2025 });
2026 }
2027 });
2028 }
2029
2030 fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2031 let mut ignore_stack = job.ignore_stack;
2032 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2033 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2034 }
2035
2036 let mut edits = Vec::new();
2037 for mut entry in snapshot.child_entries(&job.path).cloned() {
2038 let was_ignored = entry.is_ignored;
2039 entry.is_ignored = ignore_stack.is_path_ignored(entry.path(), entry.is_dir());
2040 if entry.is_dir() {
2041 let child_ignore_stack = if entry.is_ignored {
2042 IgnoreStack::all()
2043 } else {
2044 ignore_stack.clone()
2045 };
2046 job.ignore_queue
2047 .send(UpdateIgnoreStatusJob {
2048 path: entry.path().clone(),
2049 ignore_stack: child_ignore_stack,
2050 ignore_queue: job.ignore_queue.clone(),
2051 })
2052 .unwrap();
2053 }
2054
2055 if entry.is_ignored != was_ignored {
2056 edits.push(Edit::Insert(entry));
2057 }
2058 }
2059 self.snapshot.lock().entries.edit(edits, &());
2060 }
2061}
2062
2063fn refresh_entry(snapshot: &Mutex<Snapshot>, path: Arc<Path>, abs_path: &Path) -> Result<Entry> {
2064 let root_char_bag;
2065 let next_entry_id;
2066 {
2067 let snapshot = snapshot.lock();
2068 root_char_bag = snapshot.root_char_bag;
2069 next_entry_id = snapshot.next_entry_id.clone();
2070 }
2071 let entry = fs_entry_for_path(root_char_bag, &next_entry_id, path, abs_path)?
2072 .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
2073 Ok(snapshot.lock().insert_entry(entry))
2074}
2075
2076fn fs_entry_for_path(
2077 root_char_bag: CharBag,
2078 next_entry_id: &AtomicUsize,
2079 path: Arc<Path>,
2080 abs_path: &Path,
2081) -> Result<Option<Entry>> {
2082 let metadata = match fs::metadata(&abs_path) {
2083 Err(err) => {
2084 return match (err.kind(), err.raw_os_error()) {
2085 (io::ErrorKind::NotFound, _) => Ok(None),
2086 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
2087 _ => Err(anyhow::Error::new(err)),
2088 }
2089 }
2090 Ok(metadata) => metadata,
2091 };
2092 let inode = metadata.ino();
2093 let mtime = metadata.modified()?;
2094 let is_symlink = fs::symlink_metadata(&abs_path)
2095 .context("failed to read symlink metadata")?
2096 .file_type()
2097 .is_symlink();
2098
2099 let entry = Entry {
2100 id: next_entry_id.fetch_add(1, SeqCst),
2101 kind: if metadata.file_type().is_dir() {
2102 EntryKind::PendingDir
2103 } else {
2104 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2105 },
2106 path: Arc::from(path),
2107 inode,
2108 mtime,
2109 is_symlink,
2110 is_ignored: false,
2111 };
2112
2113 Ok(Some(entry))
2114}
2115
2116fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2117 let mut result = root_char_bag;
2118 result.extend(
2119 path.to_string_lossy()
2120 .chars()
2121 .map(|c| c.to_ascii_lowercase()),
2122 );
2123 result
2124}
2125
2126struct ScanJob {
2127 abs_path: PathBuf,
2128 path: Arc<Path>,
2129 ignore_stack: Arc<IgnoreStack>,
2130 scan_queue: crossbeam_channel::Sender<ScanJob>,
2131}
2132
2133struct UpdateIgnoreStatusJob {
2134 path: Arc<Path>,
2135 ignore_stack: Arc<IgnoreStack>,
2136 ignore_queue: crossbeam_channel::Sender<UpdateIgnoreStatusJob>,
2137}
2138
2139pub trait WorktreeHandle {
2140 #[cfg(test)]
2141 fn flush_fs_events<'a>(
2142 &self,
2143 cx: &'a gpui::TestAppContext,
2144 ) -> futures::future::LocalBoxFuture<'a, ()>;
2145}
2146
2147impl WorktreeHandle for ModelHandle<Worktree> {
2148 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2149 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2150 // extra directory scans, and emit extra scan-state notifications.
2151 //
2152 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2153 // to ensure that all redundant FS events have already been processed.
2154 #[cfg(test)]
2155 fn flush_fs_events<'a>(
2156 &self,
2157 cx: &'a gpui::TestAppContext,
2158 ) -> futures::future::LocalBoxFuture<'a, ()> {
2159 use smol::future::FutureExt;
2160
2161 let filename = "fs-event-sentinel";
2162 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2163 let tree = self.clone();
2164 async move {
2165 fs::write(root_path.join(filename), "").unwrap();
2166 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2167 .await;
2168
2169 fs::remove_file(root_path.join(filename)).unwrap();
2170 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2171 .await;
2172
2173 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2174 .await;
2175 }
2176 .boxed_local()
2177 }
2178}
2179
2180pub enum FileIter<'a> {
2181 All(Cursor<'a, Entry, FileCount, ()>),
2182 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2183}
2184
2185impl<'a> FileIter<'a> {
2186 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2187 let mut cursor = snapshot.entries.cursor();
2188 cursor.seek(&FileCount(start), Bias::Right, &());
2189 Self::All(cursor)
2190 }
2191
2192 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2193 let mut cursor = snapshot.entries.cursor();
2194 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2195 Self::Visible(cursor)
2196 }
2197
2198 fn next_internal(&mut self) {
2199 match self {
2200 Self::All(cursor) => {
2201 let ix = *cursor.seek_start();
2202 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2203 }
2204 Self::Visible(cursor) => {
2205 let ix = *cursor.seek_start();
2206 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2207 }
2208 }
2209 }
2210
2211 fn item(&self) -> Option<&'a Entry> {
2212 match self {
2213 Self::All(cursor) => cursor.item(),
2214 Self::Visible(cursor) => cursor.item(),
2215 }
2216 }
2217}
2218
2219impl<'a> Iterator for FileIter<'a> {
2220 type Item = &'a Entry;
2221
2222 fn next(&mut self) -> Option<Self::Item> {
2223 if let Some(entry) = self.item() {
2224 self.next_internal();
2225 Some(entry)
2226 } else {
2227 None
2228 }
2229 }
2230}
2231
2232struct ChildEntriesIter<'a> {
2233 parent_path: &'a Path,
2234 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2235}
2236
2237impl<'a> ChildEntriesIter<'a> {
2238 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2239 let mut cursor = snapshot.entries.cursor();
2240 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2241 Self {
2242 parent_path,
2243 cursor,
2244 }
2245 }
2246}
2247
2248impl<'a> Iterator for ChildEntriesIter<'a> {
2249 type Item = &'a Entry;
2250
2251 fn next(&mut self) -> Option<Self::Item> {
2252 if let Some(item) = self.cursor.item() {
2253 if item.path().starts_with(self.parent_path) {
2254 self.cursor
2255 .seek_forward(&PathSearch::Successor(item.path()), Bias::Left, &());
2256 Some(item)
2257 } else {
2258 None
2259 }
2260 } else {
2261 None
2262 }
2263 }
2264}
2265
2266impl<'a> From<&'a Entry> for proto::Entry {
2267 fn from(entry: &'a Entry) -> Self {
2268 Self {
2269 id: entry.id as u64,
2270 is_dir: entry.is_dir(),
2271 path: entry.path.to_string_lossy().to_string(),
2272 inode: entry.inode,
2273 mtime: Some(entry.mtime.into()),
2274 is_symlink: entry.is_symlink,
2275 is_ignored: entry.is_ignored,
2276 }
2277 }
2278}
2279
2280impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2281 type Error = anyhow::Error;
2282
2283 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2284 if let Some(mtime) = entry.mtime {
2285 let kind = if entry.is_dir {
2286 EntryKind::Dir
2287 } else {
2288 let mut char_bag = root_char_bag.clone();
2289 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2290 EntryKind::File(char_bag)
2291 };
2292 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2293 Ok(Entry {
2294 id: entry.id as usize,
2295 kind,
2296 path: path.clone(),
2297 inode: entry.inode,
2298 mtime: mtime.into(),
2299 is_symlink: entry.is_symlink,
2300 is_ignored: entry.is_ignored,
2301 })
2302 } else {
2303 Err(anyhow!(
2304 "missing mtime in remote worktree entry {:?}",
2305 entry.path
2306 ))
2307 }
2308 }
2309}
2310
2311mod remote {
2312 use super::*;
2313
2314 pub async fn add_peer(
2315 envelope: TypedEnvelope<proto::AddPeer>,
2316 rpc: &rpc::Client,
2317 cx: &mut AsyncAppContext,
2318 ) -> anyhow::Result<()> {
2319 rpc.state
2320 .read()
2321 .await
2322 .shared_worktree(envelope.payload.worktree_id, cx)?
2323 .update(cx, |worktree, cx| worktree.add_peer(envelope, cx))
2324 }
2325
2326 pub async fn remove_peer(
2327 envelope: TypedEnvelope<proto::RemovePeer>,
2328 rpc: &rpc::Client,
2329 cx: &mut AsyncAppContext,
2330 ) -> anyhow::Result<()> {
2331 rpc.state
2332 .read()
2333 .await
2334 .shared_worktree(envelope.payload.worktree_id, cx)?
2335 .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
2336 }
2337
2338 pub async fn update_worktree(
2339 envelope: TypedEnvelope<proto::UpdateWorktree>,
2340 rpc: &rpc::Client,
2341 cx: &mut AsyncAppContext,
2342 ) -> anyhow::Result<()> {
2343 rpc.state
2344 .read()
2345 .await
2346 .shared_worktree(envelope.payload.worktree_id, cx)?
2347 .update(cx, |worktree, _| {
2348 if let Some(worktree) = worktree.as_remote_mut() {
2349 let mut tx = worktree.updates_tx.clone();
2350 Ok(async move {
2351 tx.send(envelope.payload)
2352 .await
2353 .expect("receiver runs to completion");
2354 })
2355 } else {
2356 Err(anyhow!(
2357 "invalid update message for local worktree {}",
2358 envelope.payload.worktree_id
2359 ))
2360 }
2361 })?
2362 .await;
2363
2364 Ok(())
2365 }
2366
2367 pub async fn open_buffer(
2368 envelope: TypedEnvelope<proto::OpenBuffer>,
2369 rpc: &rpc::Client,
2370 cx: &mut AsyncAppContext,
2371 ) -> anyhow::Result<()> {
2372 let receipt = envelope.receipt();
2373 let worktree = rpc
2374 .state
2375 .read()
2376 .await
2377 .shared_worktree(envelope.payload.worktree_id, cx)?;
2378
2379 let response = worktree
2380 .update(cx, |worktree, cx| {
2381 worktree
2382 .as_local_mut()
2383 .unwrap()
2384 .open_remote_buffer(envelope, cx)
2385 })
2386 .await?;
2387
2388 rpc.respond(receipt, response).await?;
2389
2390 Ok(())
2391 }
2392
2393 pub async fn close_buffer(
2394 envelope: TypedEnvelope<proto::CloseBuffer>,
2395 rpc: &rpc::Client,
2396 cx: &mut AsyncAppContext,
2397 ) -> anyhow::Result<()> {
2398 let worktree = rpc
2399 .state
2400 .read()
2401 .await
2402 .shared_worktree(envelope.payload.worktree_id, cx)?;
2403
2404 worktree.update(cx, |worktree, cx| {
2405 worktree
2406 .as_local_mut()
2407 .unwrap()
2408 .close_remote_buffer(envelope, cx)
2409 })
2410 }
2411
2412 pub async fn update_buffer(
2413 envelope: TypedEnvelope<proto::UpdateBuffer>,
2414 rpc: &rpc::Client,
2415 cx: &mut AsyncAppContext,
2416 ) -> anyhow::Result<()> {
2417 let message = envelope.payload;
2418 rpc.state
2419 .read()
2420 .await
2421 .shared_worktree(message.worktree_id, cx)?
2422 .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
2423 Ok(())
2424 }
2425
2426 pub async fn save_buffer(
2427 envelope: TypedEnvelope<proto::SaveBuffer>,
2428 rpc: &rpc::Client,
2429 cx: &mut AsyncAppContext,
2430 ) -> anyhow::Result<()> {
2431 let state = rpc.state.read().await;
2432 let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
2433 let sender_id = envelope.original_sender_id()?;
2434 let buffer = worktree.read_with(cx, |tree, _| {
2435 tree.as_local()
2436 .unwrap()
2437 .shared_buffers
2438 .get(&sender_id)
2439 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
2440 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
2441 })?;
2442 let version = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
2443 rpc.respond(
2444 envelope.receipt(),
2445 proto::BufferSaved {
2446 worktree_id: envelope.payload.worktree_id,
2447 buffer_id: envelope.payload.buffer_id,
2448 version: (&version).into(),
2449 },
2450 )
2451 .await?;
2452 Ok(())
2453 }
2454
2455 pub async fn buffer_saved(
2456 envelope: TypedEnvelope<proto::BufferSaved>,
2457 rpc: &rpc::Client,
2458 cx: &mut AsyncAppContext,
2459 ) -> anyhow::Result<()> {
2460 rpc.state
2461 .read()
2462 .await
2463 .shared_worktree(envelope.payload.worktree_id, cx)?
2464 .update(cx, |worktree, cx| {
2465 worktree.buffer_saved(envelope.payload, cx)
2466 })?;
2467 Ok(())
2468 }
2469}
2470
2471#[cfg(test)]
2472mod tests {
2473 use super::*;
2474 use crate::test::*;
2475 use anyhow::Result;
2476 use rand::prelude::*;
2477 use serde_json::json;
2478 use std::time::UNIX_EPOCH;
2479 use std::{env, fmt::Write, os::unix, time::SystemTime};
2480
2481 #[gpui::test]
2482 async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
2483 let dir = temp_tree(json!({
2484 "root": {
2485 "apple": "",
2486 "banana": {
2487 "carrot": {
2488 "date": "",
2489 "endive": "",
2490 }
2491 },
2492 "fennel": {
2493 "grape": "",
2494 }
2495 }
2496 }));
2497
2498 let root_link_path = dir.path().join("root_link");
2499 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2500 unix::fs::symlink(
2501 &dir.path().join("root/fennel"),
2502 &dir.path().join("root/finnochio"),
2503 )
2504 .unwrap();
2505
2506 let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx));
2507
2508 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2509 .await;
2510 cx.read(|cx| {
2511 let tree = tree.read(cx);
2512 assert_eq!(tree.file_count(), 5);
2513
2514 assert_eq!(
2515 tree.inode_for_path("fennel/grape"),
2516 tree.inode_for_path("finnochio/grape")
2517 );
2518
2519 let results = match_paths(
2520 Some(tree.snapshot()).iter(),
2521 "bna",
2522 false,
2523 false,
2524 false,
2525 10,
2526 Default::default(),
2527 cx.thread_pool().clone(),
2528 )
2529 .into_iter()
2530 .map(|result| result.path)
2531 .collect::<Vec<Arc<Path>>>();
2532 assert_eq!(
2533 results,
2534 vec![
2535 PathBuf::from("banana/carrot/date").into(),
2536 PathBuf::from("banana/carrot/endive").into(),
2537 ]
2538 );
2539 })
2540 }
2541
2542 #[gpui::test]
2543 async fn test_save_file(mut cx: gpui::TestAppContext) {
2544 let app_state = cx.read(build_app_state);
2545 let dir = temp_tree(json!({
2546 "file1": "the old contents",
2547 }));
2548 let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages, cx));
2549 let buffer = tree
2550 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2551 .await
2552 .unwrap();
2553 let save = buffer.update(&mut cx, |buffer, cx| {
2554 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2555 buffer.save(cx).unwrap()
2556 });
2557 save.await.unwrap();
2558
2559 let new_text = fs::read_to_string(dir.path().join("file1")).unwrap();
2560 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2561 }
2562
2563 #[gpui::test]
2564 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2565 let app_state = cx.read(build_app_state);
2566 let dir = temp_tree(json!({
2567 "file1": "the old contents",
2568 }));
2569 let file_path = dir.path().join("file1");
2570
2571 let tree = cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages, cx));
2572 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2573 .await;
2574 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2575
2576 let buffer = tree
2577 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2578 .await
2579 .unwrap();
2580 let save = buffer.update(&mut cx, |buffer, cx| {
2581 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2582 buffer.save(cx).unwrap()
2583 });
2584 save.await.unwrap();
2585
2586 let new_text = fs::read_to_string(file_path).unwrap();
2587 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2588 }
2589
2590 #[gpui::test]
2591 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2592 let dir = temp_tree(json!({
2593 "a": {
2594 "file1": "",
2595 "file2": "",
2596 "file3": "",
2597 },
2598 "b": {
2599 "c": {
2600 "file4": "",
2601 "file5": "",
2602 }
2603 }
2604 }));
2605
2606 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
2607
2608 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2609 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2610 async move { buffer.await.unwrap() }
2611 };
2612 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2613 tree.read_with(cx, |tree, _| {
2614 tree.entry_for_path(path)
2615 .expect(&format!("no entry for path {}", path))
2616 .id
2617 })
2618 };
2619
2620 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2621 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2622 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2623 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2624
2625 let file2_id = id_for_path("a/file2", &cx);
2626 let file3_id = id_for_path("a/file3", &cx);
2627 let file4_id = id_for_path("b/c/file4", &cx);
2628
2629 // Wait for the initial scan.
2630 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2631 .await;
2632
2633 // Create a remote copy of this worktree.
2634 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2635 let worktree_id = 1;
2636 let share_request = tree
2637 .update(&mut cx, |tree, cx| {
2638 tree.as_local().unwrap().share_request(cx)
2639 })
2640 .await;
2641 let remote = Worktree::remote(
2642 proto::OpenWorktreeResponse {
2643 worktree_id,
2644 worktree: share_request.worktree,
2645 replica_id: 1,
2646 peers: Vec::new(),
2647 },
2648 rpc::Client::new(Default::default()),
2649 Default::default(),
2650 &mut cx.to_async(),
2651 )
2652 .await
2653 .unwrap();
2654
2655 cx.read(|cx| {
2656 assert!(!buffer2.read(cx).is_dirty());
2657 assert!(!buffer3.read(cx).is_dirty());
2658 assert!(!buffer4.read(cx).is_dirty());
2659 assert!(!buffer5.read(cx).is_dirty());
2660 });
2661
2662 // Rename and delete files and directories.
2663 tree.flush_fs_events(&cx).await;
2664 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2665 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2666 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2667 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2668 tree.flush_fs_events(&cx).await;
2669
2670 let expected_paths = vec![
2671 "a",
2672 "a/file1",
2673 "a/file2.new",
2674 "b",
2675 "d",
2676 "d/file3",
2677 "d/file4",
2678 ];
2679
2680 cx.read(|app| {
2681 assert_eq!(
2682 tree.read(app)
2683 .paths()
2684 .map(|p| p.to_str().unwrap())
2685 .collect::<Vec<_>>(),
2686 expected_paths
2687 );
2688
2689 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2690 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2691 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2692
2693 assert_eq!(
2694 buffer2.read(app).file().unwrap().path().as_ref(),
2695 Path::new("a/file2.new")
2696 );
2697 assert_eq!(
2698 buffer3.read(app).file().unwrap().path().as_ref(),
2699 Path::new("d/file3")
2700 );
2701 assert_eq!(
2702 buffer4.read(app).file().unwrap().path().as_ref(),
2703 Path::new("d/file4")
2704 );
2705 assert_eq!(
2706 buffer5.read(app).file().unwrap().path().as_ref(),
2707 Path::new("b/c/file5")
2708 );
2709
2710 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2711 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2712 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2713 assert!(buffer5.read(app).file().unwrap().is_deleted());
2714 });
2715
2716 // Update the remote worktree. Check that it becomes consistent with the
2717 // local worktree.
2718 remote.update(&mut cx, |remote, cx| {
2719 let update_message = tree
2720 .read(cx)
2721 .snapshot()
2722 .build_update(&initial_snapshot, worktree_id);
2723 remote
2724 .as_remote_mut()
2725 .unwrap()
2726 .snapshot
2727 .apply_update(update_message)
2728 .unwrap();
2729
2730 assert_eq!(
2731 remote
2732 .paths()
2733 .map(|p| p.to_str().unwrap())
2734 .collect::<Vec<_>>(),
2735 expected_paths
2736 );
2737 });
2738 }
2739
2740 #[gpui::test]
2741 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
2742 let dir = temp_tree(json!({
2743 ".git": {},
2744 ".gitignore": "ignored-dir\n",
2745 "tracked-dir": {
2746 "tracked-file1": "tracked contents",
2747 },
2748 "ignored-dir": {
2749 "ignored-file1": "ignored contents",
2750 }
2751 }));
2752
2753 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
2754 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2755 .await;
2756 tree.flush_fs_events(&cx).await;
2757 cx.read(|cx| {
2758 let tree = tree.read(cx);
2759 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2760 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2761 assert_eq!(tracked.is_ignored(), false);
2762 assert_eq!(ignored.is_ignored(), true);
2763 });
2764
2765 fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2766 fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2767 tree.flush_fs_events(&cx).await;
2768 cx.read(|cx| {
2769 let tree = tree.read(cx);
2770 let dot_git = tree.entry_for_path(".git").unwrap();
2771 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2772 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2773 assert_eq!(tracked.is_ignored(), false);
2774 assert_eq!(ignored.is_ignored(), true);
2775 assert_eq!(dot_git.is_ignored(), true);
2776 });
2777 }
2778
2779 #[test]
2780 fn test_random() {
2781 let iterations = env::var("ITERATIONS")
2782 .map(|i| i.parse().unwrap())
2783 .unwrap_or(100);
2784 let operations = env::var("OPERATIONS")
2785 .map(|o| o.parse().unwrap())
2786 .unwrap_or(40);
2787 let initial_entries = env::var("INITIAL_ENTRIES")
2788 .map(|o| o.parse().unwrap())
2789 .unwrap_or(20);
2790 let seeds = if let Ok(seed) = env::var("SEED").map(|s| s.parse().unwrap()) {
2791 seed..seed + 1
2792 } else {
2793 0..iterations
2794 };
2795
2796 for seed in seeds {
2797 dbg!(seed);
2798 let mut rng = StdRng::seed_from_u64(seed);
2799
2800 let root_dir = tempdir::TempDir::new(&format!("test-{}", seed)).unwrap();
2801 for _ in 0..initial_entries {
2802 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2803 }
2804 log::info!("Generated initial tree");
2805
2806 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2807 let mut scanner = BackgroundScanner::new(
2808 Arc::new(Mutex::new(Snapshot {
2809 id: 0,
2810 scan_id: 0,
2811 abs_path: root_dir.path().into(),
2812 entries: Default::default(),
2813 paths_by_id: Default::default(),
2814 removed_entry_ids: Default::default(),
2815 ignores: Default::default(),
2816 root_name: Default::default(),
2817 root_char_bag: Default::default(),
2818 next_entry_id: Default::default(),
2819 })),
2820 notify_tx,
2821 0,
2822 );
2823 scanner.scan_dirs().unwrap();
2824 scanner.snapshot().check_invariants();
2825
2826 let mut events = Vec::new();
2827 let mut mutations_len = operations;
2828 while mutations_len > 1 {
2829 if !events.is_empty() && rng.gen_bool(0.4) {
2830 let len = rng.gen_range(0..=events.len());
2831 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2832 log::info!("Delivering events: {:#?}", to_deliver);
2833 scanner.process_events(to_deliver);
2834 scanner.snapshot().check_invariants();
2835 } else {
2836 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2837 mutations_len -= 1;
2838 }
2839 }
2840 log::info!("Quiescing: {:#?}", events);
2841 scanner.process_events(events);
2842 scanner.snapshot().check_invariants();
2843
2844 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2845 let mut new_scanner = BackgroundScanner::new(
2846 Arc::new(Mutex::new(Snapshot {
2847 id: 0,
2848 scan_id: 0,
2849 abs_path: root_dir.path().into(),
2850 entries: Default::default(),
2851 paths_by_id: Default::default(),
2852 removed_entry_ids: Default::default(),
2853 ignores: Default::default(),
2854 root_name: Default::default(),
2855 root_char_bag: Default::default(),
2856 next_entry_id: Default::default(),
2857 })),
2858 notify_tx,
2859 1,
2860 );
2861 new_scanner.scan_dirs().unwrap();
2862 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
2863 }
2864 }
2865
2866 fn randomly_mutate_tree(
2867 root_path: &Path,
2868 insertion_probability: f64,
2869 rng: &mut impl Rng,
2870 ) -> Result<Vec<fsevent::Event>> {
2871 let root_path = root_path.canonicalize().unwrap();
2872 let (dirs, files) = read_dir_recursive(root_path.clone());
2873
2874 let mut events = Vec::new();
2875 let mut record_event = |path: PathBuf| {
2876 events.push(fsevent::Event {
2877 event_id: SystemTime::now()
2878 .duration_since(UNIX_EPOCH)
2879 .unwrap()
2880 .as_secs(),
2881 flags: fsevent::StreamFlags::empty(),
2882 path,
2883 });
2884 };
2885
2886 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
2887 let path = dirs.choose(rng).unwrap();
2888 let new_path = path.join(gen_name(rng));
2889
2890 if rng.gen() {
2891 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
2892 fs::create_dir(&new_path)?;
2893 } else {
2894 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
2895 fs::write(&new_path, "")?;
2896 }
2897 record_event(new_path);
2898 } else if rng.gen_bool(0.05) {
2899 let ignore_dir_path = dirs.choose(rng).unwrap();
2900 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
2901
2902 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
2903 let files_to_ignore = {
2904 let len = rng.gen_range(0..=subfiles.len());
2905 subfiles.choose_multiple(rng, len)
2906 };
2907 let dirs_to_ignore = {
2908 let len = rng.gen_range(0..subdirs.len());
2909 subdirs.choose_multiple(rng, len)
2910 };
2911
2912 let mut ignore_contents = String::new();
2913 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
2914 write!(
2915 ignore_contents,
2916 "{}\n",
2917 path_to_ignore
2918 .strip_prefix(&ignore_dir_path)?
2919 .to_str()
2920 .unwrap()
2921 )
2922 .unwrap();
2923 }
2924 log::info!(
2925 "Creating {:?} with contents:\n{}",
2926 ignore_path.strip_prefix(&root_path)?,
2927 ignore_contents
2928 );
2929 fs::write(&ignore_path, ignore_contents).unwrap();
2930 record_event(ignore_path);
2931 } else {
2932 let old_path = {
2933 let file_path = files.choose(rng);
2934 let dir_path = dirs[1..].choose(rng);
2935 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
2936 };
2937
2938 let is_rename = rng.gen();
2939 if is_rename {
2940 let new_path_parent = dirs
2941 .iter()
2942 .filter(|d| !d.starts_with(old_path))
2943 .choose(rng)
2944 .unwrap();
2945
2946 let overwrite_existing_dir =
2947 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
2948 let new_path = if overwrite_existing_dir {
2949 fs::remove_dir_all(&new_path_parent).ok();
2950 new_path_parent.to_path_buf()
2951 } else {
2952 new_path_parent.join(gen_name(rng))
2953 };
2954
2955 log::info!(
2956 "Renaming {:?} to {}{:?}",
2957 old_path.strip_prefix(&root_path)?,
2958 if overwrite_existing_dir {
2959 "overwrite "
2960 } else {
2961 ""
2962 },
2963 new_path.strip_prefix(&root_path)?
2964 );
2965 fs::rename(&old_path, &new_path)?;
2966 record_event(old_path.clone());
2967 record_event(new_path);
2968 } else if old_path.is_dir() {
2969 let (dirs, files) = read_dir_recursive(old_path.clone());
2970
2971 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
2972 fs::remove_dir_all(&old_path).unwrap();
2973 for file in files {
2974 record_event(file);
2975 }
2976 for dir in dirs {
2977 record_event(dir);
2978 }
2979 } else {
2980 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
2981 fs::remove_file(old_path).unwrap();
2982 record_event(old_path.clone());
2983 }
2984 }
2985
2986 Ok(events)
2987 }
2988
2989 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
2990 let child_entries = fs::read_dir(&path).unwrap();
2991 let mut dirs = vec![path];
2992 let mut files = Vec::new();
2993 for child_entry in child_entries {
2994 let child_path = child_entry.unwrap().path();
2995 if child_path.is_dir() {
2996 let (child_dirs, child_files) = read_dir_recursive(child_path);
2997 dirs.extend(child_dirs);
2998 files.extend(child_files);
2999 } else {
3000 files.push(child_path);
3001 }
3002 }
3003 (dirs, files)
3004 }
3005
3006 fn gen_name(rng: &mut impl Rng) -> String {
3007 (0..6)
3008 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3009 .map(char::from)
3010 .collect()
3011 }
3012
3013 impl Snapshot {
3014 fn check_invariants(&self) {
3015 let mut files = self.files(0);
3016 let mut visible_files = self.visible_files(0);
3017 for entry in self.entries.cursor::<(), ()>() {
3018 if entry.is_file() {
3019 assert_eq!(files.next().unwrap().inode(), entry.inode);
3020 if !entry.is_ignored {
3021 assert_eq!(visible_files.next().unwrap().inode(), entry.inode);
3022 }
3023 }
3024 }
3025 assert!(files.next().is_none());
3026 assert!(visible_files.next().is_none());
3027
3028 let mut bfs_paths = Vec::new();
3029 let mut stack = vec![Path::new("")];
3030 while let Some(path) = stack.pop() {
3031 bfs_paths.push(path);
3032 let ix = stack.len();
3033 for child_entry in self.child_entries(path) {
3034 stack.insert(ix, child_entry.path());
3035 }
3036 }
3037
3038 let dfs_paths = self
3039 .entries
3040 .cursor::<(), ()>()
3041 .map(|e| e.path().as_ref())
3042 .collect::<Vec<_>>();
3043 assert_eq!(bfs_paths, dfs_paths);
3044
3045 for (ignore_parent_path, _) in &self.ignores {
3046 assert!(self.entry_for_path(ignore_parent_path).is_some());
3047 assert!(self
3048 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3049 .is_some());
3050 }
3051 }
3052
3053 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3054 let mut paths = Vec::new();
3055 for entry in self.entries.cursor::<(), ()>() {
3056 paths.push((entry.path().as_ref(), entry.inode(), entry.is_ignored()));
3057 }
3058 paths.sort_by(|a, b| a.0.cmp(&b.0));
3059 paths
3060 }
3061 }
3062}