1mod ignore;
2
3use self::ignore::IgnoreStack;
4use crate::{
5 editor::{self, Buffer, History, Operation, Rope},
6 fs::{self, Fs},
7 fuzzy,
8 fuzzy::CharBag,
9 language::LanguageRegistry,
10 rpc::{self, proto},
11 time::{self, ReplicaId},
12 util::{Bias, TryFutureExt},
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Result};
16use futures::{Stream, StreamExt};
17pub use fuzzy::{match_paths, PathMatch};
18use gpui::{
19 executor,
20 sum_tree::{self, Cursor, Edit, SumTree},
21 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
22 UpgradeModelHandle, WeakModelHandle,
23};
24use lazy_static::lazy_static;
25use parking_lot::Mutex;
26use postage::{
27 prelude::{Sink as _, Stream as _},
28 watch,
29};
30use smol::channel::{self, Sender};
31use std::{
32 cmp::{self, Ordering},
33 collections::HashMap,
34 convert::{TryFrom, TryInto},
35 ffi::{OsStr, OsString},
36 fmt,
37 future::Future,
38 ops::Deref,
39 path::{Path, PathBuf},
40 sync::{
41 atomic::{AtomicUsize, Ordering::SeqCst},
42 Arc,
43 },
44 time::{Duration, SystemTime},
45};
46use zrpc::{PeerId, TypedEnvelope};
47
48lazy_static! {
49 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
50}
51
52#[derive(Clone, Debug)]
53enum ScanState {
54 Idle,
55 Scanning,
56 Err(Arc<anyhow::Error>),
57}
58
59pub enum Worktree {
60 Local(LocalWorktree),
61 Remote(RemoteWorktree),
62}
63
64impl Entity for Worktree {
65 type Event = ();
66
67 fn release(&mut self, cx: &mut MutableAppContext) {
68 let rpc = match self {
69 Self::Local(tree) => tree
70 .share
71 .as_ref()
72 .map(|share| (share.rpc.clone(), share.remote_id)),
73 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
74 };
75
76 if let Some((rpc, worktree_id)) = rpc {
77 cx.spawn(|_| async move {
78 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
79 log::error!("error closing worktree {}: {}", worktree_id, err);
80 }
81 })
82 .detach();
83 }
84 }
85}
86
87impl Worktree {
88 pub async fn open_local(
89 path: impl Into<Arc<Path>>,
90 languages: Arc<LanguageRegistry>,
91 fs: Arc<dyn Fs>,
92 cx: &mut AsyncAppContext,
93 ) -> Result<ModelHandle<Self>> {
94 let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx).await?;
95 tree.update(cx, |tree, cx| {
96 let tree = tree.as_local_mut().unwrap();
97 let abs_path = tree.snapshot.abs_path.clone();
98 let background_snapshot = tree.background_snapshot.clone();
99 let background = cx.background().clone();
100 tree._background_scanner_task = Some(cx.background().spawn(async move {
101 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
102 let scanner =
103 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
104 scanner.run(events).await;
105 }));
106 });
107 Ok(tree)
108 }
109
110 pub async fn open_remote(
111 rpc: Arc<rpc::Client>,
112 id: u64,
113 access_token: String,
114 languages: Arc<LanguageRegistry>,
115 cx: &mut AsyncAppContext,
116 ) -> Result<ModelHandle<Self>> {
117 let response = rpc
118 .request(proto::OpenWorktree {
119 worktree_id: id,
120 access_token,
121 })
122 .await?;
123
124 Worktree::remote(response, rpc, languages, cx).await
125 }
126
127 async fn remote(
128 open_response: proto::OpenWorktreeResponse,
129 rpc: Arc<rpc::Client>,
130 languages: Arc<LanguageRegistry>,
131 cx: &mut AsyncAppContext,
132 ) -> Result<ModelHandle<Self>> {
133 let worktree = open_response
134 .worktree
135 .ok_or_else(|| anyhow!("empty worktree"))?;
136
137 let remote_id = open_response.worktree_id;
138 let replica_id = open_response.replica_id as ReplicaId;
139 let peers = open_response.peers;
140 let root_char_bag: CharBag = worktree
141 .root_name
142 .chars()
143 .map(|c| c.to_ascii_lowercase())
144 .collect();
145 let root_name = worktree.root_name.clone();
146 let (entries_by_path, entries_by_id) = cx
147 .background()
148 .spawn(async move {
149 let mut entries_by_path_edits = Vec::new();
150 let mut entries_by_id_edits = Vec::new();
151 for entry in worktree.entries {
152 match Entry::try_from((&root_char_bag, entry)) {
153 Ok(entry) => {
154 entries_by_id_edits.push(Edit::Insert(PathEntry {
155 id: entry.id,
156 path: entry.path.clone(),
157 scan_id: 0,
158 }));
159 entries_by_path_edits.push(Edit::Insert(entry));
160 }
161 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
162 }
163 }
164
165 let mut entries_by_path = SumTree::new();
166 let mut entries_by_id = SumTree::new();
167 entries_by_path.edit(entries_by_path_edits, &());
168 entries_by_id.edit(entries_by_id_edits, &());
169 (entries_by_path, entries_by_id)
170 })
171 .await;
172
173 let worktree = cx.update(|cx| {
174 cx.add_model(|cx: &mut ModelContext<Worktree>| {
175 let snapshot = Snapshot {
176 id: cx.model_id(),
177 scan_id: 0,
178 abs_path: Path::new("").into(),
179 root_name,
180 root_char_bag,
181 ignores: Default::default(),
182 entries_by_path,
183 entries_by_id,
184 removed_entry_ids: Default::default(),
185 next_entry_id: Default::default(),
186 };
187
188 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
189 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
190
191 cx.background()
192 .spawn(async move {
193 while let Some(update) = updates_rx.recv().await {
194 let mut snapshot = snapshot_tx.borrow().clone();
195 if let Err(error) = snapshot.apply_update(update) {
196 log::error!("error applying worktree update: {}", error);
197 }
198 *snapshot_tx.borrow_mut() = snapshot;
199 }
200 })
201 .detach();
202
203 {
204 let mut snapshot_rx = snapshot_rx.clone();
205 cx.spawn_weak(|this, mut cx| async move {
206 while let Some(_) = snapshot_rx.recv().await {
207 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
208 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
209 } else {
210 break;
211 }
212 }
213 })
214 .detach();
215 }
216
217 let _subscriptions = vec![
218 rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
219 rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
220 rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
221 rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
222 rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
223 ];
224
225 Worktree::Remote(RemoteWorktree {
226 remote_id,
227 replica_id,
228 snapshot,
229 snapshot_rx,
230 updates_tx,
231 rpc: rpc.clone(),
232 open_buffers: Default::default(),
233 peers: peers
234 .into_iter()
235 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
236 .collect(),
237 queued_operations: Default::default(),
238 languages,
239 _subscriptions,
240 })
241 })
242 });
243
244 Ok(worktree)
245 }
246
247 pub fn as_local(&self) -> Option<&LocalWorktree> {
248 if let Worktree::Local(worktree) = self {
249 Some(worktree)
250 } else {
251 None
252 }
253 }
254
255 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
256 if let Worktree::Local(worktree) = self {
257 Some(worktree)
258 } else {
259 None
260 }
261 }
262
263 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
264 if let Worktree::Remote(worktree) = self {
265 Some(worktree)
266 } else {
267 None
268 }
269 }
270
271 pub fn languages(&self) -> &Arc<LanguageRegistry> {
272 match self {
273 Worktree::Local(worktree) => &worktree.languages,
274 Worktree::Remote(worktree) => &worktree.languages,
275 }
276 }
277
278 pub fn snapshot(&self) -> Snapshot {
279 match self {
280 Worktree::Local(worktree) => worktree.snapshot(),
281 Worktree::Remote(worktree) => worktree.snapshot(),
282 }
283 }
284
285 pub fn replica_id(&self) -> ReplicaId {
286 match self {
287 Worktree::Local(_) => 0,
288 Worktree::Remote(worktree) => worktree.replica_id,
289 }
290 }
291
292 pub fn handle_add_peer(
293 &mut self,
294 envelope: TypedEnvelope<proto::AddPeer>,
295 _: Arc<rpc::Client>,
296 cx: &mut ModelContext<Self>,
297 ) -> Result<()> {
298 match self {
299 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
300 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
301 }
302 }
303
304 pub fn handle_remove_peer(
305 &mut self,
306 envelope: TypedEnvelope<proto::RemovePeer>,
307 _: Arc<rpc::Client>,
308 cx: &mut ModelContext<Self>,
309 ) -> Result<()> {
310 match self {
311 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
312 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
313 }
314 }
315
316 pub fn handle_update(
317 &mut self,
318 envelope: TypedEnvelope<proto::UpdateWorktree>,
319 _: Arc<rpc::Client>,
320 cx: &mut ModelContext<Self>,
321 ) -> anyhow::Result<()> {
322 self.as_remote_mut()
323 .unwrap()
324 .update_from_remote(envelope, cx)
325 }
326
327 pub fn handle_open_buffer(
328 &mut self,
329 envelope: TypedEnvelope<proto::OpenBuffer>,
330 rpc: Arc<rpc::Client>,
331 cx: &mut ModelContext<Self>,
332 ) -> anyhow::Result<()> {
333 let receipt = envelope.receipt();
334
335 let response = self
336 .as_local_mut()
337 .unwrap()
338 .open_remote_buffer(envelope, cx);
339
340 cx.background()
341 .spawn(
342 async move {
343 rpc.respond(receipt, response.await?).await?;
344 Ok(())
345 }
346 .log_err(),
347 )
348 .detach();
349
350 Ok(())
351 }
352
353 pub fn handle_close_buffer(
354 &mut self,
355 envelope: TypedEnvelope<proto::CloseBuffer>,
356 _: Arc<rpc::Client>,
357 cx: &mut ModelContext<Self>,
358 ) -> anyhow::Result<()> {
359 self.as_local_mut()
360 .unwrap()
361 .close_remote_buffer(envelope, cx)
362 }
363
364 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
365 match self {
366 Worktree::Local(worktree) => &worktree.peers,
367 Worktree::Remote(worktree) => &worktree.peers,
368 }
369 }
370
371 pub fn open_buffer(
372 &mut self,
373 path: impl AsRef<Path>,
374 cx: &mut ModelContext<Self>,
375 ) -> Task<Result<ModelHandle<Buffer>>> {
376 match self {
377 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
378 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
379 }
380 }
381
382 #[cfg(feature = "test-support")]
383 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
384 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
385 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
386 Worktree::Remote(worktree) => {
387 Box::new(worktree.open_buffers.values().filter_map(|buf| {
388 if let RemoteBuffer::Loaded(buf) = buf {
389 Some(buf)
390 } else {
391 None
392 }
393 }))
394 }
395 };
396
397 let path = path.as_ref();
398 open_buffers
399 .find(|buffer| {
400 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
401 file.path.as_ref() == path
402 } else {
403 false
404 }
405 })
406 .is_some()
407 }
408
409 pub fn handle_update_buffer(
410 &mut self,
411 envelope: TypedEnvelope<proto::UpdateBuffer>,
412 _: Arc<rpc::Client>,
413 cx: &mut ModelContext<Self>,
414 ) -> Result<()> {
415 let payload = envelope.payload.clone();
416 let buffer_id = payload.buffer_id as usize;
417 let ops = payload
418 .operations
419 .into_iter()
420 .map(|op| op.try_into())
421 .collect::<anyhow::Result<Vec<_>>>()?;
422
423 match self {
424 Worktree::Local(worktree) => {
425 let buffer = worktree
426 .open_buffers
427 .get(&buffer_id)
428 .and_then(|buf| buf.upgrade(cx))
429 .ok_or_else(|| {
430 anyhow!("invalid buffer {} in update buffer message", buffer_id)
431 })?;
432 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
433 }
434 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
435 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
436 Some(RemoteBuffer::Loaded(buffer)) => {
437 if let Some(buffer) = buffer.upgrade(cx) {
438 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
439 } else {
440 worktree
441 .open_buffers
442 .insert(buffer_id, RemoteBuffer::Operations(ops));
443 }
444 }
445 None => {
446 worktree
447 .open_buffers
448 .insert(buffer_id, RemoteBuffer::Operations(ops));
449 }
450 },
451 }
452
453 Ok(())
454 }
455
456 pub fn handle_save_buffer(
457 &mut self,
458 envelope: TypedEnvelope<proto::SaveBuffer>,
459 rpc: Arc<rpc::Client>,
460 cx: &mut ModelContext<Self>,
461 ) -> Result<()> {
462 let sender_id = envelope.original_sender_id()?;
463 let buffer = self
464 .as_local()
465 .unwrap()
466 .shared_buffers
467 .get(&sender_id)
468 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
469 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
470
471 let receipt = envelope.receipt();
472 let worktree_id = envelope.payload.worktree_id;
473 let buffer_id = envelope.payload.buffer_id;
474 let save = cx.spawn(|_, mut cx| async move {
475 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
476 });
477
478 cx.background()
479 .spawn(
480 async move {
481 let (version, mtime) = save.await?;
482
483 rpc.respond(
484 receipt,
485 proto::BufferSaved {
486 worktree_id,
487 buffer_id,
488 version: (&version).into(),
489 mtime: Some(mtime.into()),
490 },
491 )
492 .await?;
493
494 Ok(())
495 }
496 .log_err(),
497 )
498 .detach();
499
500 Ok(())
501 }
502
503 pub fn handle_buffer_saved(
504 &mut self,
505 envelope: TypedEnvelope<proto::BufferSaved>,
506 _: Arc<rpc::Client>,
507 cx: &mut ModelContext<Self>,
508 ) -> Result<()> {
509 let payload = envelope.payload.clone();
510 let worktree = self.as_remote_mut().unwrap();
511 if let Some(buffer) = worktree
512 .open_buffers
513 .get(&(payload.buffer_id as usize))
514 .and_then(|buf| buf.upgrade(cx))
515 {
516 buffer.update(cx, |buffer, cx| {
517 let version = payload.version.try_into()?;
518 let mtime = payload
519 .mtime
520 .ok_or_else(|| anyhow!("missing mtime"))?
521 .into();
522 buffer.did_save(version, mtime, cx);
523 Result::<_, anyhow::Error>::Ok(())
524 })?;
525 }
526 Ok(())
527 }
528
529 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
530 match self {
531 Self::Local(worktree) => {
532 let is_fake_fs = worktree.fs.is_fake();
533 worktree.snapshot = worktree.background_snapshot.lock().clone();
534 if worktree.is_scanning() {
535 if worktree.poll_task.is_none() {
536 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
537 if is_fake_fs {
538 smol::future::yield_now().await;
539 } else {
540 smol::Timer::after(Duration::from_millis(100)).await;
541 }
542 this.update(&mut cx, |this, cx| {
543 this.as_local_mut().unwrap().poll_task = None;
544 this.poll_snapshot(cx);
545 })
546 }));
547 }
548 } else {
549 worktree.poll_task.take();
550 self.update_open_buffers(cx);
551 }
552 }
553 Self::Remote(worktree) => {
554 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
555 self.update_open_buffers(cx);
556 }
557 };
558
559 cx.notify();
560 }
561
562 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
563 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
564 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
565 Self::Remote(worktree) => {
566 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
567 if let RemoteBuffer::Loaded(buf) = buf {
568 Some((id, buf))
569 } else {
570 None
571 }
572 }))
573 }
574 };
575
576 let mut buffers_to_delete = Vec::new();
577 for (buffer_id, buffer) in open_buffers {
578 if let Some(buffer) = buffer.upgrade(cx) {
579 buffer.update(cx, |buffer, cx| {
580 let buffer_is_clean = !buffer.is_dirty();
581
582 if let Some(file) = buffer.file_mut() {
583 let mut file_changed = false;
584
585 if let Some(entry) = file
586 .entry_id
587 .and_then(|entry_id| self.entry_for_id(entry_id))
588 {
589 if entry.path != file.path {
590 file.path = entry.path.clone();
591 file_changed = true;
592 }
593
594 if entry.mtime != file.mtime {
595 file.mtime = entry.mtime;
596 file_changed = true;
597 if let Some(worktree) = self.as_local() {
598 if buffer_is_clean {
599 let abs_path = worktree.absolutize(&file.path);
600 refresh_buffer(abs_path, &worktree.fs, cx);
601 }
602 }
603 }
604 } else if let Some(entry) = self.entry_for_path(&file.path) {
605 file.entry_id = Some(entry.id);
606 file.mtime = entry.mtime;
607 if let Some(worktree) = self.as_local() {
608 if buffer_is_clean {
609 let abs_path = worktree.absolutize(&file.path);
610 refresh_buffer(abs_path, &worktree.fs, cx);
611 }
612 }
613 file_changed = true;
614 } else if !file.is_deleted() {
615 if buffer_is_clean {
616 cx.emit(editor::buffer::Event::Dirtied);
617 }
618 file.entry_id = None;
619 file_changed = true;
620 }
621
622 if file_changed {
623 cx.emit(editor::buffer::Event::FileHandleChanged);
624 }
625 }
626 });
627 } else {
628 buffers_to_delete.push(*buffer_id);
629 }
630 }
631
632 for buffer_id in buffers_to_delete {
633 match self {
634 Self::Local(worktree) => {
635 worktree.open_buffers.remove(&buffer_id);
636 }
637 Self::Remote(worktree) => {
638 worktree.open_buffers.remove(&buffer_id);
639 }
640 }
641 }
642 }
643}
644
645impl Deref for Worktree {
646 type Target = Snapshot;
647
648 fn deref(&self) -> &Self::Target {
649 match self {
650 Worktree::Local(worktree) => &worktree.snapshot,
651 Worktree::Remote(worktree) => &worktree.snapshot,
652 }
653 }
654}
655
656pub struct LocalWorktree {
657 snapshot: Snapshot,
658 background_snapshot: Arc<Mutex<Snapshot>>,
659 last_scan_state_rx: watch::Receiver<ScanState>,
660 _background_scanner_task: Option<Task<()>>,
661 poll_task: Option<Task<()>>,
662 share: Option<ShareState>,
663 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
664 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
665 peers: HashMap<PeerId, ReplicaId>,
666 languages: Arc<LanguageRegistry>,
667 queued_operations: Vec<(u64, Operation)>,
668 fs: Arc<dyn Fs>,
669}
670
671impl LocalWorktree {
672 async fn new(
673 path: impl Into<Arc<Path>>,
674 languages: Arc<LanguageRegistry>,
675 fs: Arc<dyn Fs>,
676 cx: &mut AsyncAppContext,
677 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
678 let abs_path = path.into();
679 let path: Arc<Path> = Arc::from(Path::new(""));
680 let next_entry_id = AtomicUsize::new(0);
681
682 // After determining whether the root entry is a file or a directory, populate the
683 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
684 let root_name = abs_path
685 .file_name()
686 .map_or(String::new(), |f| f.to_string_lossy().to_string());
687 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
688 let metadata = fs.metadata(&abs_path).await?;
689
690 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
691 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
692 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
693 let mut snapshot = Snapshot {
694 id: cx.model_id(),
695 scan_id: 0,
696 abs_path,
697 root_name,
698 root_char_bag,
699 ignores: Default::default(),
700 entries_by_path: Default::default(),
701 entries_by_id: Default::default(),
702 removed_entry_ids: Default::default(),
703 next_entry_id: Arc::new(next_entry_id),
704 };
705 if let Some(metadata) = metadata {
706 snapshot.insert_entry(Entry::new(
707 path.into(),
708 &metadata,
709 &snapshot.next_entry_id,
710 snapshot.root_char_bag,
711 ));
712 }
713
714 let tree = Self {
715 snapshot: snapshot.clone(),
716 background_snapshot: Arc::new(Mutex::new(snapshot)),
717 last_scan_state_rx,
718 _background_scanner_task: None,
719 share: None,
720 poll_task: None,
721 open_buffers: Default::default(),
722 shared_buffers: Default::default(),
723 queued_operations: Default::default(),
724 peers: Default::default(),
725 languages,
726 fs,
727 };
728
729 cx.spawn_weak(|this, mut cx| async move {
730 while let Ok(scan_state) = scan_states_rx.recv().await {
731 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
732 let to_send = handle.update(&mut cx, |this, cx| {
733 last_scan_state_tx.blocking_send(scan_state).ok();
734 this.poll_snapshot(cx);
735 let tree = this.as_local_mut().unwrap();
736 if !tree.is_scanning() {
737 if let Some(share) = tree.share.as_ref() {
738 Some((tree.snapshot(), share.snapshots_tx.clone()))
739 } else {
740 None
741 }
742 } else {
743 None
744 }
745 });
746
747 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
748 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
749 log::error!("error submitting snapshot to send {}", err);
750 }
751 }
752 } else {
753 break;
754 }
755 }
756 })
757 .detach();
758
759 Worktree::Local(tree)
760 });
761
762 Ok((tree, scan_states_tx))
763 }
764
765 pub fn open_buffer(
766 &mut self,
767 path: &Path,
768 cx: &mut ModelContext<Worktree>,
769 ) -> Task<Result<ModelHandle<Buffer>>> {
770 let handle = cx.handle();
771
772 // If there is already a buffer for the given path, then return it.
773 let mut existing_buffer = None;
774 self.open_buffers.retain(|_buffer_id, buffer| {
775 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
776 if let Some(file) = buffer.read(cx.as_ref()).file() {
777 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
778 existing_buffer = Some(buffer);
779 }
780 }
781 true
782 } else {
783 false
784 }
785 });
786
787 let languages = self.languages.clone();
788 let path = Arc::from(path);
789 cx.spawn(|this, mut cx| async move {
790 if let Some(existing_buffer) = existing_buffer {
791 Ok(existing_buffer)
792 } else {
793 let (file, contents) = this
794 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
795 .await?;
796 let language = languages.select_language(&path).cloned();
797 let buffer = cx.add_model(|cx| {
798 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
799 });
800 this.update(&mut cx, |this, _| {
801 let this = this
802 .as_local_mut()
803 .ok_or_else(|| anyhow!("must be a local worktree"))?;
804 this.open_buffers.insert(buffer.id(), buffer.downgrade());
805 Ok(buffer)
806 })
807 }
808 })
809 }
810
811 pub fn open_remote_buffer(
812 &mut self,
813 envelope: TypedEnvelope<proto::OpenBuffer>,
814 cx: &mut ModelContext<Worktree>,
815 ) -> Task<Result<proto::OpenBufferResponse>> {
816 let peer_id = envelope.original_sender_id();
817 let path = Path::new(&envelope.payload.path);
818
819 let buffer = self.open_buffer(path, cx);
820
821 cx.spawn(|this, mut cx| async move {
822 let buffer = buffer.await?;
823 this.update(&mut cx, |this, cx| {
824 this.as_local_mut()
825 .unwrap()
826 .shared_buffers
827 .entry(peer_id?)
828 .or_default()
829 .insert(buffer.id() as u64, buffer.clone());
830
831 Ok(proto::OpenBufferResponse {
832 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
833 })
834 })
835 })
836 }
837
838 pub fn close_remote_buffer(
839 &mut self,
840 envelope: TypedEnvelope<proto::CloseBuffer>,
841 cx: &mut ModelContext<Worktree>,
842 ) -> Result<()> {
843 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
844 shared_buffers.remove(&envelope.payload.buffer_id);
845 cx.notify();
846 }
847
848 Ok(())
849 }
850
851 pub fn add_peer(
852 &mut self,
853 envelope: TypedEnvelope<proto::AddPeer>,
854 cx: &mut ModelContext<Worktree>,
855 ) -> Result<()> {
856 let peer = envelope
857 .payload
858 .peer
859 .as_ref()
860 .ok_or_else(|| anyhow!("empty peer"))?;
861 self.peers
862 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
863 cx.notify();
864
865 Ok(())
866 }
867
868 pub fn remove_peer(
869 &mut self,
870 envelope: TypedEnvelope<proto::RemovePeer>,
871 cx: &mut ModelContext<Worktree>,
872 ) -> Result<()> {
873 let peer_id = PeerId(envelope.payload.peer_id);
874 let replica_id = self
875 .peers
876 .remove(&peer_id)
877 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
878 self.shared_buffers.remove(&peer_id);
879 for (_, buffer) in &self.open_buffers {
880 if let Some(buffer) = buffer.upgrade(cx) {
881 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
882 }
883 }
884 cx.notify();
885
886 Ok(())
887 }
888
889 pub fn scan_complete(&self) -> impl Future<Output = ()> {
890 let mut scan_state_rx = self.last_scan_state_rx.clone();
891 async move {
892 let mut scan_state = Some(scan_state_rx.borrow().clone());
893 while let Some(ScanState::Scanning) = scan_state {
894 scan_state = scan_state_rx.recv().await;
895 }
896 }
897 }
898
899 fn is_scanning(&self) -> bool {
900 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
901 true
902 } else {
903 false
904 }
905 }
906
907 pub fn snapshot(&self) -> Snapshot {
908 self.snapshot.clone()
909 }
910
911 pub fn abs_path(&self) -> &Path {
912 self.snapshot.abs_path.as_ref()
913 }
914
915 pub fn contains_abs_path(&self, path: &Path) -> bool {
916 path.starts_with(&self.snapshot.abs_path)
917 }
918
919 fn absolutize(&self, path: &Path) -> PathBuf {
920 if path.file_name().is_some() {
921 self.snapshot.abs_path.join(path)
922 } else {
923 self.snapshot.abs_path.to_path_buf()
924 }
925 }
926
927 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
928 let handle = cx.handle();
929 let path = Arc::from(path);
930 let abs_path = self.absolutize(&path);
931 let background_snapshot = self.background_snapshot.clone();
932 let fs = self.fs.clone();
933 cx.spawn(|this, mut cx| async move {
934 let text = fs.load(&abs_path).await?;
935 // Eagerly populate the snapshot with an updated entry for the loaded file
936 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
937 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
938 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
939 })
940 }
941
942 pub fn save_buffer_as(
943 &self,
944 buffer: ModelHandle<Buffer>,
945 path: impl Into<Arc<Path>>,
946 text: Rope,
947 cx: &mut ModelContext<Worktree>,
948 ) -> Task<Result<File>> {
949 let save = self.save(path, text, cx);
950 cx.spawn(|this, mut cx| async move {
951 let entry = save.await?;
952 this.update(&mut cx, |this, cx| {
953 this.as_local_mut()
954 .unwrap()
955 .open_buffers
956 .insert(buffer.id(), buffer.downgrade());
957 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
958 })
959 })
960 }
961
962 fn save(
963 &self,
964 path: impl Into<Arc<Path>>,
965 text: Rope,
966 cx: &mut ModelContext<Worktree>,
967 ) -> Task<Result<Entry>> {
968 let path = path.into();
969 let abs_path = self.absolutize(&path);
970 let background_snapshot = self.background_snapshot.clone();
971 let fs = self.fs.clone();
972 let save = cx.background().spawn(async move {
973 fs.save(&abs_path, &text).await?;
974 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
975 });
976
977 cx.spawn(|this, mut cx| async move {
978 let entry = save.await?;
979 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
980 Ok(entry)
981 })
982 }
983
984 pub fn share(
985 &mut self,
986 rpc: Arc<rpc::Client>,
987 cx: &mut ModelContext<Worktree>,
988 ) -> Task<anyhow::Result<(u64, String)>> {
989 let snapshot = self.snapshot();
990 let share_request = self.share_request(cx);
991 cx.spawn(|this, mut cx| async move {
992 let share_request = share_request.await;
993 let share_response = rpc.request(share_request).await?;
994 let remote_id = share_response.worktree_id;
995
996 log::info!("sharing worktree {:?}", share_response);
997 let (snapshots_to_send_tx, snapshots_to_send_rx) =
998 smol::channel::unbounded::<Snapshot>();
999
1000 cx.background()
1001 .spawn({
1002 let rpc = rpc.clone();
1003 async move {
1004 let mut prev_snapshot = snapshot;
1005 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1006 let message = snapshot.build_update(&prev_snapshot, remote_id);
1007 match rpc.send(message).await {
1008 Ok(()) => prev_snapshot = snapshot,
1009 Err(err) => log::error!("error sending snapshot diff {}", err),
1010 }
1011 }
1012 }
1013 })
1014 .detach();
1015
1016 this.update(&mut cx, |worktree, cx| {
1017 let _subscriptions = vec![
1018 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
1019 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
1020 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
1021 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
1022 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
1023 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
1024 ];
1025
1026 let worktree = worktree.as_local_mut().unwrap();
1027 worktree.share = Some(ShareState {
1028 rpc,
1029 remote_id: share_response.worktree_id,
1030 snapshots_tx: snapshots_to_send_tx,
1031 _subscriptions,
1032 });
1033 });
1034
1035 Ok((remote_id, share_response.access_token))
1036 })
1037 }
1038
1039 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
1040 let snapshot = self.snapshot();
1041 let root_name = self.root_name.clone();
1042 cx.background().spawn(async move {
1043 let entries = snapshot
1044 .entries_by_path
1045 .cursor::<(), ()>()
1046 .map(Into::into)
1047 .collect();
1048 proto::ShareWorktree {
1049 worktree: Some(proto::Worktree { root_name, entries }),
1050 }
1051 })
1052 }
1053}
1054
1055pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1056 let fs = fs.clone();
1057 cx.spawn(|buffer, mut cx| async move {
1058 let new_text = fs.load(&abs_path).await;
1059 match new_text {
1060 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1061 Ok(new_text) => {
1062 buffer
1063 .update(&mut cx, |buffer, cx| {
1064 buffer.set_text_from_disk(new_text.into(), cx)
1065 })
1066 .await;
1067 }
1068 }
1069 })
1070 .detach()
1071}
1072
1073impl Deref for LocalWorktree {
1074 type Target = Snapshot;
1075
1076 fn deref(&self) -> &Self::Target {
1077 &self.snapshot
1078 }
1079}
1080
1081impl fmt::Debug for LocalWorktree {
1082 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1083 self.snapshot.fmt(f)
1084 }
1085}
1086
1087struct ShareState {
1088 rpc: Arc<rpc::Client>,
1089 remote_id: u64,
1090 snapshots_tx: Sender<Snapshot>,
1091 _subscriptions: Vec<rpc::Subscription>,
1092}
1093
1094pub struct RemoteWorktree {
1095 remote_id: u64,
1096 snapshot: Snapshot,
1097 snapshot_rx: watch::Receiver<Snapshot>,
1098 rpc: Arc<rpc::Client>,
1099 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1100 replica_id: ReplicaId,
1101 open_buffers: HashMap<usize, RemoteBuffer>,
1102 peers: HashMap<PeerId, ReplicaId>,
1103 languages: Arc<LanguageRegistry>,
1104 queued_operations: Vec<(u64, Operation)>,
1105 _subscriptions: Vec<rpc::Subscription>,
1106}
1107
1108impl RemoteWorktree {
1109 pub fn open_buffer(
1110 &mut self,
1111 path: &Path,
1112 cx: &mut ModelContext<Worktree>,
1113 ) -> Task<Result<ModelHandle<Buffer>>> {
1114 let handle = cx.handle();
1115 let mut existing_buffer = None;
1116 self.open_buffers.retain(|_buffer_id, buffer| {
1117 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1118 if let Some(file) = buffer.read(cx.as_ref()).file() {
1119 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1120 existing_buffer = Some(buffer);
1121 }
1122 }
1123 true
1124 } else {
1125 false
1126 }
1127 });
1128
1129 let rpc = self.rpc.clone();
1130 let languages = self.languages.clone();
1131 let replica_id = self.replica_id;
1132 let remote_worktree_id = self.remote_id;
1133 let path = path.to_string_lossy().to_string();
1134 cx.spawn(|this, mut cx| async move {
1135 if let Some(existing_buffer) = existing_buffer {
1136 Ok(existing_buffer)
1137 } else {
1138 let entry = this
1139 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1140 .ok_or_else(|| anyhow!("file does not exist"))?;
1141 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1142 let language = languages.select_language(&path).cloned();
1143 let response = rpc
1144 .request(proto::OpenBuffer {
1145 worktree_id: remote_worktree_id as u64,
1146 path,
1147 })
1148 .await?;
1149 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1150 let buffer_id = remote_buffer.id as usize;
1151 let buffer = cx.add_model(|cx| {
1152 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1153 });
1154 this.update(&mut cx, |this, cx| {
1155 let this = this.as_remote_mut().unwrap();
1156 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1157 .open_buffers
1158 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1159 {
1160 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1161 }
1162 Result::<_, anyhow::Error>::Ok(())
1163 })?;
1164 Ok(buffer)
1165 }
1166 })
1167 }
1168
1169 fn snapshot(&self) -> Snapshot {
1170 self.snapshot.clone()
1171 }
1172
1173 fn update_from_remote(
1174 &mut self,
1175 envelope: TypedEnvelope<proto::UpdateWorktree>,
1176 cx: &mut ModelContext<Worktree>,
1177 ) -> Result<()> {
1178 let mut tx = self.updates_tx.clone();
1179 let payload = envelope.payload.clone();
1180 cx.background()
1181 .spawn(async move {
1182 tx.send(payload).await.expect("receiver runs to completion");
1183 })
1184 .detach();
1185
1186 Ok(())
1187 }
1188
1189 pub fn add_peer(
1190 &mut self,
1191 envelope: TypedEnvelope<proto::AddPeer>,
1192 cx: &mut ModelContext<Worktree>,
1193 ) -> Result<()> {
1194 let peer = envelope
1195 .payload
1196 .peer
1197 .as_ref()
1198 .ok_or_else(|| anyhow!("empty peer"))?;
1199 self.peers
1200 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1201 cx.notify();
1202 Ok(())
1203 }
1204
1205 pub fn remove_peer(
1206 &mut self,
1207 envelope: TypedEnvelope<proto::RemovePeer>,
1208 cx: &mut ModelContext<Worktree>,
1209 ) -> Result<()> {
1210 let peer_id = PeerId(envelope.payload.peer_id);
1211 let replica_id = self
1212 .peers
1213 .remove(&peer_id)
1214 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1215 for (_, buffer) in &self.open_buffers {
1216 if let Some(buffer) = buffer.upgrade(cx) {
1217 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1218 }
1219 }
1220 cx.notify();
1221 Ok(())
1222 }
1223}
1224
1225enum RemoteBuffer {
1226 Operations(Vec<Operation>),
1227 Loaded(WeakModelHandle<Buffer>),
1228}
1229
1230impl RemoteBuffer {
1231 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1232 match self {
1233 Self::Operations(_) => None,
1234 Self::Loaded(buffer) => buffer.upgrade(cx),
1235 }
1236 }
1237}
1238
1239#[derive(Clone)]
1240pub struct Snapshot {
1241 id: usize,
1242 scan_id: usize,
1243 abs_path: Arc<Path>,
1244 root_name: String,
1245 root_char_bag: CharBag,
1246 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1247 entries_by_path: SumTree<Entry>,
1248 entries_by_id: SumTree<PathEntry>,
1249 removed_entry_ids: HashMap<u64, usize>,
1250 next_entry_id: Arc<AtomicUsize>,
1251}
1252
1253impl Snapshot {
1254 pub fn id(&self) -> usize {
1255 self.id
1256 }
1257
1258 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1259 let mut updated_entries = Vec::new();
1260 let mut removed_entries = Vec::new();
1261 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1262 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1263 loop {
1264 match (self_entries.peek(), other_entries.peek()) {
1265 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1266 Ordering::Less => {
1267 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1268 updated_entries.push(entry);
1269 self_entries.next();
1270 }
1271 Ordering::Equal => {
1272 if self_entry.scan_id != other_entry.scan_id {
1273 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1274 updated_entries.push(entry);
1275 }
1276
1277 self_entries.next();
1278 other_entries.next();
1279 }
1280 Ordering::Greater => {
1281 removed_entries.push(other_entry.id as u64);
1282 other_entries.next();
1283 }
1284 },
1285 (Some(self_entry), None) => {
1286 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1287 updated_entries.push(entry);
1288 self_entries.next();
1289 }
1290 (None, Some(other_entry)) => {
1291 removed_entries.push(other_entry.id as u64);
1292 other_entries.next();
1293 }
1294 (None, None) => break,
1295 }
1296 }
1297
1298 proto::UpdateWorktree {
1299 updated_entries,
1300 removed_entries,
1301 worktree_id,
1302 }
1303 }
1304
1305 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1306 self.scan_id += 1;
1307 let scan_id = self.scan_id;
1308
1309 let mut entries_by_path_edits = Vec::new();
1310 let mut entries_by_id_edits = Vec::new();
1311 for entry_id in update.removed_entries {
1312 let entry_id = entry_id as usize;
1313 let entry = self
1314 .entry_for_id(entry_id)
1315 .ok_or_else(|| anyhow!("unknown entry"))?;
1316 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1317 entries_by_id_edits.push(Edit::Remove(entry.id));
1318 }
1319
1320 for entry in update.updated_entries {
1321 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1322 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1323 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1324 }
1325 entries_by_id_edits.push(Edit::Insert(PathEntry {
1326 id: entry.id,
1327 path: entry.path.clone(),
1328 scan_id,
1329 }));
1330 entries_by_path_edits.push(Edit::Insert(entry));
1331 }
1332
1333 self.entries_by_path.edit(entries_by_path_edits, &());
1334 self.entries_by_id.edit(entries_by_id_edits, &());
1335
1336 Ok(())
1337 }
1338
1339 pub fn file_count(&self) -> usize {
1340 self.entries_by_path.summary().file_count
1341 }
1342
1343 pub fn visible_file_count(&self) -> usize {
1344 self.entries_by_path.summary().visible_file_count
1345 }
1346
1347 pub fn files(&self, start: usize) -> FileIter {
1348 FileIter::all(self, start)
1349 }
1350
1351 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1352 let empty_path = Path::new("");
1353 self.entries_by_path
1354 .cursor::<(), ()>()
1355 .filter(move |entry| entry.path.as_ref() != empty_path)
1356 .map(|entry| &entry.path)
1357 }
1358
1359 pub fn visible_files(&self, start: usize) -> FileIter {
1360 FileIter::visible(self, start)
1361 }
1362
1363 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1364 ChildEntriesIter::new(path, self)
1365 }
1366
1367 pub fn root_entry(&self) -> Option<&Entry> {
1368 self.entry_for_path("")
1369 }
1370
1371 pub fn root_name(&self) -> &str {
1372 &self.root_name
1373 }
1374
1375 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1376 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1377 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1378 cursor.item()
1379 } else {
1380 None
1381 }
1382 }
1383
1384 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1385 let entry = self.entries_by_id.get(&id, &())?;
1386 self.entry_for_path(&entry.path)
1387 }
1388
1389 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1390 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1391 }
1392
1393 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1394 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1395 let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1396 if let Some(err) = err {
1397 log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1398 }
1399
1400 let ignore_dir_path = entry.path.parent().unwrap();
1401 self.ignores
1402 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1403 }
1404
1405 self.reuse_entry_id(&mut entry);
1406 self.entries_by_path.insert_or_replace(entry.clone(), &());
1407 self.entries_by_id.insert_or_replace(
1408 PathEntry {
1409 id: entry.id,
1410 path: entry.path.clone(),
1411 scan_id: self.scan_id,
1412 },
1413 &(),
1414 );
1415 entry
1416 }
1417
1418 fn populate_dir(
1419 &mut self,
1420 parent_path: Arc<Path>,
1421 entries: impl IntoIterator<Item = Entry>,
1422 ignore: Option<Arc<Gitignore>>,
1423 ) {
1424 let mut parent_entry = self
1425 .entries_by_path
1426 .get(&PathKey(parent_path.clone()), &())
1427 .unwrap()
1428 .clone();
1429 if let Some(ignore) = ignore {
1430 self.ignores.insert(parent_path, (ignore, self.scan_id));
1431 }
1432 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1433 parent_entry.kind = EntryKind::Dir;
1434 } else {
1435 unreachable!();
1436 }
1437
1438 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1439 let mut entries_by_id_edits = Vec::new();
1440
1441 for mut entry in entries {
1442 self.reuse_entry_id(&mut entry);
1443 entries_by_id_edits.push(Edit::Insert(PathEntry {
1444 id: entry.id,
1445 path: entry.path.clone(),
1446 scan_id: self.scan_id,
1447 }));
1448 entries_by_path_edits.push(Edit::Insert(entry));
1449 }
1450
1451 self.entries_by_path.edit(entries_by_path_edits, &());
1452 self.entries_by_id.edit(entries_by_id_edits, &());
1453 }
1454
1455 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1456 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1457 entry.id = removed_entry_id;
1458 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1459 entry.id = existing_entry.id;
1460 }
1461 }
1462
1463 fn remove_path(&mut self, path: &Path) {
1464 let mut new_entries;
1465 let removed_entry_ids;
1466 {
1467 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1468 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1469 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1470 new_entries.push_tree(cursor.suffix(&()), &());
1471 }
1472 self.entries_by_path = new_entries;
1473
1474 let mut entries_by_id_edits = Vec::new();
1475 for entry in removed_entry_ids.cursor::<(), ()>() {
1476 let removed_entry_id = self
1477 .removed_entry_ids
1478 .entry(entry.inode)
1479 .or_insert(entry.id);
1480 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1481 entries_by_id_edits.push(Edit::Remove(entry.id));
1482 }
1483 self.entries_by_id.edit(entries_by_id_edits, &());
1484
1485 if path.file_name() == Some(&GITIGNORE) {
1486 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1487 *scan_id = self.scan_id;
1488 }
1489 }
1490 }
1491
1492 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1493 let mut new_ignores = Vec::new();
1494 for ancestor in path.ancestors().skip(1) {
1495 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1496 new_ignores.push((ancestor, Some(ignore.clone())));
1497 } else {
1498 new_ignores.push((ancestor, None));
1499 }
1500 }
1501
1502 let mut ignore_stack = IgnoreStack::none();
1503 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1504 if ignore_stack.is_path_ignored(&parent_path, true) {
1505 ignore_stack = IgnoreStack::all();
1506 break;
1507 } else if let Some(ignore) = ignore {
1508 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1509 }
1510 }
1511
1512 if ignore_stack.is_path_ignored(path, is_dir) {
1513 ignore_stack = IgnoreStack::all();
1514 }
1515
1516 ignore_stack
1517 }
1518}
1519
1520impl fmt::Debug for Snapshot {
1521 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1522 for entry in self.entries_by_path.cursor::<(), ()>() {
1523 for _ in entry.path.ancestors().skip(1) {
1524 write!(f, " ")?;
1525 }
1526 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1527 }
1528 Ok(())
1529 }
1530}
1531
1532#[derive(Clone, PartialEq)]
1533pub struct File {
1534 entry_id: Option<usize>,
1535 worktree: ModelHandle<Worktree>,
1536 pub path: Arc<Path>,
1537 pub mtime: SystemTime,
1538}
1539
1540impl File {
1541 pub fn new(
1542 entry_id: usize,
1543 worktree: ModelHandle<Worktree>,
1544 path: Arc<Path>,
1545 mtime: SystemTime,
1546 ) -> Self {
1547 Self {
1548 entry_id: Some(entry_id),
1549 worktree,
1550 path,
1551 mtime,
1552 }
1553 }
1554
1555 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1556 self.worktree.update(cx, |worktree, cx| {
1557 if let Some((rpc, remote_id)) = match worktree {
1558 Worktree::Local(worktree) => worktree
1559 .share
1560 .as_ref()
1561 .map(|share| (share.rpc.clone(), share.remote_id)),
1562 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1563 } {
1564 cx.spawn(|worktree, mut cx| async move {
1565 if let Err(error) = rpc
1566 .request(proto::UpdateBuffer {
1567 worktree_id: remote_id,
1568 buffer_id,
1569 operations: vec![(&operation).into()],
1570 })
1571 .await
1572 {
1573 worktree.update(&mut cx, |worktree, _| {
1574 log::error!("error sending buffer operation: {}", error);
1575 match worktree {
1576 Worktree::Local(t) => &mut t.queued_operations,
1577 Worktree::Remote(t) => &mut t.queued_operations,
1578 }
1579 .push((buffer_id, operation));
1580 });
1581 }
1582 })
1583 .detach();
1584 }
1585 });
1586 }
1587
1588 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1589 self.worktree.update(cx, |worktree, cx| {
1590 if let Worktree::Remote(worktree) = worktree {
1591 let worktree_id = worktree.remote_id;
1592 let rpc = worktree.rpc.clone();
1593 cx.background()
1594 .spawn(async move {
1595 if let Err(error) = rpc
1596 .send(proto::CloseBuffer {
1597 worktree_id,
1598 buffer_id,
1599 })
1600 .await
1601 {
1602 log::error!("error closing remote buffer: {}", error);
1603 }
1604 })
1605 .detach();
1606 }
1607 });
1608 }
1609
1610 /// Returns this file's path relative to the root of its worktree.
1611 pub fn path(&self) -> Arc<Path> {
1612 self.path.clone()
1613 }
1614
1615 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1616 self.worktree.read(cx).abs_path.join(&self.path)
1617 }
1618
1619 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1620 /// of its worktree, then this method will return the name of the worktree itself.
1621 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1622 self.path
1623 .file_name()
1624 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1625 .map(Into::into)
1626 }
1627
1628 pub fn is_deleted(&self) -> bool {
1629 self.entry_id.is_none()
1630 }
1631
1632 pub fn exists(&self) -> bool {
1633 !self.is_deleted()
1634 }
1635
1636 pub fn save(
1637 &self,
1638 buffer_id: u64,
1639 text: Rope,
1640 version: time::Global,
1641 cx: &mut MutableAppContext,
1642 ) -> Task<Result<(time::Global, SystemTime)>> {
1643 self.worktree.update(cx, |worktree, cx| match worktree {
1644 Worktree::Local(worktree) => {
1645 let rpc = worktree
1646 .share
1647 .as_ref()
1648 .map(|share| (share.rpc.clone(), share.remote_id));
1649 let save = worktree.save(self.path.clone(), text, cx);
1650 cx.background().spawn(async move {
1651 let entry = save.await?;
1652 if let Some((rpc, worktree_id)) = rpc {
1653 rpc.send(proto::BufferSaved {
1654 worktree_id,
1655 buffer_id,
1656 version: (&version).into(),
1657 mtime: Some(entry.mtime.into()),
1658 })
1659 .await?;
1660 }
1661 Ok((version, entry.mtime))
1662 })
1663 }
1664 Worktree::Remote(worktree) => {
1665 let rpc = worktree.rpc.clone();
1666 let worktree_id = worktree.remote_id;
1667 cx.foreground().spawn(async move {
1668 let response = rpc
1669 .request(proto::SaveBuffer {
1670 worktree_id,
1671 buffer_id,
1672 })
1673 .await?;
1674 let version = response.version.try_into()?;
1675 let mtime = response
1676 .mtime
1677 .ok_or_else(|| anyhow!("missing mtime"))?
1678 .into();
1679 Ok((version, mtime))
1680 })
1681 }
1682 })
1683 }
1684
1685 pub fn worktree_id(&self) -> usize {
1686 self.worktree.id()
1687 }
1688
1689 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1690 (self.worktree.id(), self.path.clone())
1691 }
1692}
1693
1694#[derive(Clone, Debug)]
1695pub struct Entry {
1696 pub id: usize,
1697 pub kind: EntryKind,
1698 pub path: Arc<Path>,
1699 pub inode: u64,
1700 pub mtime: SystemTime,
1701 pub is_symlink: bool,
1702 pub is_ignored: bool,
1703}
1704
1705#[derive(Clone, Debug)]
1706pub enum EntryKind {
1707 PendingDir,
1708 Dir,
1709 File(CharBag),
1710}
1711
1712impl Entry {
1713 fn new(
1714 path: Arc<Path>,
1715 metadata: &fs::Metadata,
1716 next_entry_id: &AtomicUsize,
1717 root_char_bag: CharBag,
1718 ) -> Self {
1719 Self {
1720 id: next_entry_id.fetch_add(1, SeqCst),
1721 kind: if metadata.is_dir {
1722 EntryKind::PendingDir
1723 } else {
1724 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1725 },
1726 path,
1727 inode: metadata.inode,
1728 mtime: metadata.mtime,
1729 is_symlink: metadata.is_symlink,
1730 is_ignored: false,
1731 }
1732 }
1733
1734 pub fn is_dir(&self) -> bool {
1735 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1736 }
1737
1738 pub fn is_file(&self) -> bool {
1739 matches!(self.kind, EntryKind::File(_))
1740 }
1741}
1742
1743impl sum_tree::Item for Entry {
1744 type Summary = EntrySummary;
1745
1746 fn summary(&self) -> Self::Summary {
1747 let file_count;
1748 let visible_file_count;
1749 if self.is_file() {
1750 file_count = 1;
1751 if self.is_ignored {
1752 visible_file_count = 0;
1753 } else {
1754 visible_file_count = 1;
1755 }
1756 } else {
1757 file_count = 0;
1758 visible_file_count = 0;
1759 }
1760
1761 EntrySummary {
1762 max_path: self.path.clone(),
1763 file_count,
1764 visible_file_count,
1765 }
1766 }
1767}
1768
1769impl sum_tree::KeyedItem for Entry {
1770 type Key = PathKey;
1771
1772 fn key(&self) -> Self::Key {
1773 PathKey(self.path.clone())
1774 }
1775}
1776
1777#[derive(Clone, Debug)]
1778pub struct EntrySummary {
1779 max_path: Arc<Path>,
1780 file_count: usize,
1781 visible_file_count: usize,
1782}
1783
1784impl Default for EntrySummary {
1785 fn default() -> Self {
1786 Self {
1787 max_path: Arc::from(Path::new("")),
1788 file_count: 0,
1789 visible_file_count: 0,
1790 }
1791 }
1792}
1793
1794impl sum_tree::Summary for EntrySummary {
1795 type Context = ();
1796
1797 fn add_summary(&mut self, rhs: &Self, _: &()) {
1798 self.max_path = rhs.max_path.clone();
1799 self.file_count += rhs.file_count;
1800 self.visible_file_count += rhs.visible_file_count;
1801 }
1802}
1803
1804#[derive(Clone, Debug)]
1805struct PathEntry {
1806 id: usize,
1807 path: Arc<Path>,
1808 scan_id: usize,
1809}
1810
1811impl sum_tree::Item for PathEntry {
1812 type Summary = PathEntrySummary;
1813
1814 fn summary(&self) -> Self::Summary {
1815 PathEntrySummary { max_id: self.id }
1816 }
1817}
1818
1819impl sum_tree::KeyedItem for PathEntry {
1820 type Key = usize;
1821
1822 fn key(&self) -> Self::Key {
1823 self.id
1824 }
1825}
1826
1827#[derive(Clone, Debug, Default)]
1828struct PathEntrySummary {
1829 max_id: usize,
1830}
1831
1832impl sum_tree::Summary for PathEntrySummary {
1833 type Context = ();
1834
1835 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1836 self.max_id = summary.max_id;
1837 }
1838}
1839
1840impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1841 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1842 *self = summary.max_id;
1843 }
1844}
1845
1846#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1847pub struct PathKey(Arc<Path>);
1848
1849impl Default for PathKey {
1850 fn default() -> Self {
1851 Self(Path::new("").into())
1852 }
1853}
1854
1855impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1856 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1857 self.0 = summary.max_path.clone();
1858 }
1859}
1860
1861#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1862enum PathSearch<'a> {
1863 Exact(&'a Path),
1864 Successor(&'a Path),
1865}
1866
1867impl<'a> Ord for PathSearch<'a> {
1868 fn cmp(&self, other: &Self) -> cmp::Ordering {
1869 match (self, other) {
1870 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1871 (Self::Successor(a), Self::Exact(b)) => {
1872 if b.starts_with(a) {
1873 cmp::Ordering::Greater
1874 } else {
1875 a.cmp(b)
1876 }
1877 }
1878 _ => unreachable!("not sure we need the other two cases"),
1879 }
1880 }
1881}
1882
1883impl<'a> PartialOrd for PathSearch<'a> {
1884 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1885 Some(self.cmp(other))
1886 }
1887}
1888
1889impl<'a> Default for PathSearch<'a> {
1890 fn default() -> Self {
1891 Self::Exact(Path::new("").into())
1892 }
1893}
1894
1895impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1896 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1897 *self = Self::Exact(summary.max_path.as_ref());
1898 }
1899}
1900
1901#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1902pub struct FileCount(usize);
1903
1904impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1905 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1906 self.0 += summary.file_count;
1907 }
1908}
1909
1910#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1911pub struct VisibleFileCount(usize);
1912
1913impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1914 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1915 self.0 += summary.visible_file_count;
1916 }
1917}
1918
1919struct BackgroundScanner {
1920 fs: Arc<dyn Fs>,
1921 snapshot: Arc<Mutex<Snapshot>>,
1922 notify: Sender<ScanState>,
1923 executor: Arc<executor::Background>,
1924}
1925
1926impl BackgroundScanner {
1927 fn new(
1928 snapshot: Arc<Mutex<Snapshot>>,
1929 notify: Sender<ScanState>,
1930 fs: Arc<dyn Fs>,
1931 executor: Arc<executor::Background>,
1932 ) -> Self {
1933 Self {
1934 fs,
1935 snapshot,
1936 notify,
1937 executor,
1938 }
1939 }
1940
1941 fn abs_path(&self) -> Arc<Path> {
1942 self.snapshot.lock().abs_path.clone()
1943 }
1944
1945 fn snapshot(&self) -> Snapshot {
1946 self.snapshot.lock().clone()
1947 }
1948
1949 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
1950 if self.notify.send(ScanState::Scanning).await.is_err() {
1951 return;
1952 }
1953
1954 if let Err(err) = self.scan_dirs().await {
1955 if self
1956 .notify
1957 .send(ScanState::Err(Arc::new(err)))
1958 .await
1959 .is_err()
1960 {
1961 return;
1962 }
1963 }
1964
1965 if self.notify.send(ScanState::Idle).await.is_err() {
1966 return;
1967 }
1968
1969 futures::pin_mut!(events_rx);
1970 while let Some(events) = events_rx.next().await {
1971 if self.notify.send(ScanState::Scanning).await.is_err() {
1972 break;
1973 }
1974
1975 if !self.process_events(events).await {
1976 break;
1977 }
1978
1979 if self.notify.send(ScanState::Idle).await.is_err() {
1980 break;
1981 }
1982 }
1983 }
1984
1985 async fn scan_dirs(&mut self) -> Result<()> {
1986 let root_char_bag;
1987 let next_entry_id;
1988 let is_dir;
1989 {
1990 let snapshot = self.snapshot.lock();
1991 root_char_bag = snapshot.root_char_bag;
1992 next_entry_id = snapshot.next_entry_id.clone();
1993 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
1994 };
1995
1996 if is_dir {
1997 let path: Arc<Path> = Arc::from(Path::new(""));
1998 let abs_path = self.abs_path();
1999 let (tx, rx) = channel::unbounded();
2000 tx.send(ScanJob {
2001 abs_path: abs_path.to_path_buf(),
2002 path,
2003 ignore_stack: IgnoreStack::none(),
2004 scan_queue: tx.clone(),
2005 })
2006 .await
2007 .unwrap();
2008 drop(tx);
2009
2010 self.executor
2011 .scoped(|scope| {
2012 for _ in 0..self.executor.num_cpus() {
2013 scope.spawn(async {
2014 while let Ok(job) = rx.recv().await {
2015 if let Err(err) = self
2016 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2017 .await
2018 {
2019 log::error!("error scanning {:?}: {}", job.abs_path, err);
2020 }
2021 }
2022 });
2023 }
2024 })
2025 .await;
2026 }
2027
2028 Ok(())
2029 }
2030
2031 async fn scan_dir(
2032 &self,
2033 root_char_bag: CharBag,
2034 next_entry_id: Arc<AtomicUsize>,
2035 job: &ScanJob,
2036 ) -> Result<()> {
2037 let mut new_entries: Vec<Entry> = Vec::new();
2038 let mut new_jobs: Vec<ScanJob> = Vec::new();
2039 let mut ignore_stack = job.ignore_stack.clone();
2040 let mut new_ignore = None;
2041
2042 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2043 while let Some(child_abs_path) = child_paths.next().await {
2044 let child_abs_path = match child_abs_path {
2045 Ok(child_abs_path) => child_abs_path,
2046 Err(error) => {
2047 log::error!("error processing entry {:?}", error);
2048 continue;
2049 }
2050 };
2051 let child_name = child_abs_path.file_name().unwrap();
2052 let child_path: Arc<Path> = job.path.join(child_name).into();
2053 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2054 Some(metadata) => metadata,
2055 None => continue,
2056 };
2057
2058 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2059 if child_name == *GITIGNORE {
2060 let (ignore, err) = Gitignore::new(&child_abs_path);
2061 if let Some(err) = err {
2062 log::error!("error in ignore file {:?} - {:?}", child_name, err);
2063 }
2064 let ignore = Arc::new(ignore);
2065 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2066 new_ignore = Some(ignore);
2067
2068 // Update ignore status of any child entries we've already processed to reflect the
2069 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2070 // there should rarely be too numerous. Update the ignore stack associated with any
2071 // new jobs as well.
2072 let mut new_jobs = new_jobs.iter_mut();
2073 for entry in &mut new_entries {
2074 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2075 if entry.is_dir() {
2076 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2077 IgnoreStack::all()
2078 } else {
2079 ignore_stack.clone()
2080 };
2081 }
2082 }
2083 }
2084
2085 let mut child_entry = Entry::new(
2086 child_path.clone(),
2087 &child_metadata,
2088 &next_entry_id,
2089 root_char_bag,
2090 );
2091
2092 if child_metadata.is_dir {
2093 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2094 child_entry.is_ignored = is_ignored;
2095 new_entries.push(child_entry);
2096 new_jobs.push(ScanJob {
2097 abs_path: child_abs_path,
2098 path: child_path,
2099 ignore_stack: if is_ignored {
2100 IgnoreStack::all()
2101 } else {
2102 ignore_stack.clone()
2103 },
2104 scan_queue: job.scan_queue.clone(),
2105 });
2106 } else {
2107 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2108 new_entries.push(child_entry);
2109 };
2110 }
2111
2112 self.snapshot
2113 .lock()
2114 .populate_dir(job.path.clone(), new_entries, new_ignore);
2115 for new_job in new_jobs {
2116 job.scan_queue.send(new_job).await.unwrap();
2117 }
2118
2119 Ok(())
2120 }
2121
2122 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2123 let mut snapshot = self.snapshot();
2124 snapshot.scan_id += 1;
2125
2126 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2127 abs_path
2128 } else {
2129 return false;
2130 };
2131 let root_char_bag = snapshot.root_char_bag;
2132 let next_entry_id = snapshot.next_entry_id.clone();
2133
2134 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2135 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2136
2137 for event in &events {
2138 match event.path.strip_prefix(&root_abs_path) {
2139 Ok(path) => snapshot.remove_path(&path),
2140 Err(_) => {
2141 log::error!(
2142 "unexpected event {:?} for root path {:?}",
2143 event.path,
2144 root_abs_path
2145 );
2146 continue;
2147 }
2148 }
2149 }
2150
2151 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2152 for event in events {
2153 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2154 Ok(path) => Arc::from(path.to_path_buf()),
2155 Err(_) => {
2156 log::error!(
2157 "unexpected event {:?} for root path {:?}",
2158 event.path,
2159 root_abs_path
2160 );
2161 continue;
2162 }
2163 };
2164
2165 match self.fs.metadata(&event.path).await {
2166 Ok(Some(metadata)) => {
2167 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2168 let mut fs_entry = Entry::new(
2169 path.clone(),
2170 &metadata,
2171 snapshot.next_entry_id.as_ref(),
2172 snapshot.root_char_bag,
2173 );
2174 fs_entry.is_ignored = ignore_stack.is_all();
2175 snapshot.insert_entry(fs_entry);
2176 if metadata.is_dir {
2177 scan_queue_tx
2178 .send(ScanJob {
2179 abs_path: event.path,
2180 path,
2181 ignore_stack,
2182 scan_queue: scan_queue_tx.clone(),
2183 })
2184 .await
2185 .unwrap();
2186 }
2187 }
2188 Ok(None) => {}
2189 Err(err) => {
2190 // TODO - create a special 'error' entry in the entries tree to mark this
2191 log::error!("error reading file on event {:?}", err);
2192 }
2193 }
2194 }
2195
2196 *self.snapshot.lock() = snapshot;
2197
2198 // Scan any directories that were created as part of this event batch.
2199 drop(scan_queue_tx);
2200 self.executor
2201 .scoped(|scope| {
2202 for _ in 0..self.executor.num_cpus() {
2203 scope.spawn(async {
2204 while let Ok(job) = scan_queue_rx.recv().await {
2205 if let Err(err) = self
2206 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2207 .await
2208 {
2209 log::error!("error scanning {:?}: {}", job.abs_path, err);
2210 }
2211 }
2212 });
2213 }
2214 })
2215 .await;
2216
2217 // Attempt to detect renames only over a single batch of file-system events.
2218 self.snapshot.lock().removed_entry_ids.clear();
2219
2220 self.update_ignore_statuses().await;
2221 true
2222 }
2223
2224 async fn update_ignore_statuses(&self) {
2225 let mut snapshot = self.snapshot();
2226
2227 let mut ignores_to_update = Vec::new();
2228 let mut ignores_to_delete = Vec::new();
2229 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2230 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2231 ignores_to_update.push(parent_path.clone());
2232 }
2233
2234 let ignore_path = parent_path.join(&*GITIGNORE);
2235 if snapshot.entry_for_path(ignore_path).is_none() {
2236 ignores_to_delete.push(parent_path.clone());
2237 }
2238 }
2239
2240 for parent_path in ignores_to_delete {
2241 snapshot.ignores.remove(&parent_path);
2242 self.snapshot.lock().ignores.remove(&parent_path);
2243 }
2244
2245 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2246 ignores_to_update.sort_unstable();
2247 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2248 while let Some(parent_path) = ignores_to_update.next() {
2249 while ignores_to_update
2250 .peek()
2251 .map_or(false, |p| p.starts_with(&parent_path))
2252 {
2253 ignores_to_update.next().unwrap();
2254 }
2255
2256 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2257 ignore_queue_tx
2258 .send(UpdateIgnoreStatusJob {
2259 path: parent_path,
2260 ignore_stack,
2261 ignore_queue: ignore_queue_tx.clone(),
2262 })
2263 .await
2264 .unwrap();
2265 }
2266 drop(ignore_queue_tx);
2267
2268 self.executor
2269 .scoped(|scope| {
2270 for _ in 0..self.executor.num_cpus() {
2271 scope.spawn(async {
2272 while let Ok(job) = ignore_queue_rx.recv().await {
2273 self.update_ignore_status(job, &snapshot).await;
2274 }
2275 });
2276 }
2277 })
2278 .await;
2279 }
2280
2281 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2282 let mut ignore_stack = job.ignore_stack;
2283 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2284 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2285 }
2286
2287 let mut edits = Vec::new();
2288 for mut entry in snapshot.child_entries(&job.path).cloned() {
2289 let was_ignored = entry.is_ignored;
2290 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2291 if entry.is_dir() {
2292 let child_ignore_stack = if entry.is_ignored {
2293 IgnoreStack::all()
2294 } else {
2295 ignore_stack.clone()
2296 };
2297 job.ignore_queue
2298 .send(UpdateIgnoreStatusJob {
2299 path: entry.path.clone(),
2300 ignore_stack: child_ignore_stack,
2301 ignore_queue: job.ignore_queue.clone(),
2302 })
2303 .await
2304 .unwrap();
2305 }
2306
2307 if entry.is_ignored != was_ignored {
2308 edits.push(Edit::Insert(entry));
2309 }
2310 }
2311 self.snapshot.lock().entries_by_path.edit(edits, &());
2312 }
2313}
2314
2315async fn refresh_entry(
2316 fs: &dyn Fs,
2317 snapshot: &Mutex<Snapshot>,
2318 path: Arc<Path>,
2319 abs_path: &Path,
2320) -> Result<Entry> {
2321 let root_char_bag;
2322 let next_entry_id;
2323 {
2324 let snapshot = snapshot.lock();
2325 root_char_bag = snapshot.root_char_bag;
2326 next_entry_id = snapshot.next_entry_id.clone();
2327 }
2328 let entry = Entry::new(
2329 path,
2330 &fs.metadata(abs_path)
2331 .await?
2332 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2333 &next_entry_id,
2334 root_char_bag,
2335 );
2336 Ok(snapshot.lock().insert_entry(entry))
2337}
2338
2339fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2340 let mut result = root_char_bag;
2341 result.extend(
2342 path.to_string_lossy()
2343 .chars()
2344 .map(|c| c.to_ascii_lowercase()),
2345 );
2346 result
2347}
2348
2349struct ScanJob {
2350 abs_path: PathBuf,
2351 path: Arc<Path>,
2352 ignore_stack: Arc<IgnoreStack>,
2353 scan_queue: Sender<ScanJob>,
2354}
2355
2356struct UpdateIgnoreStatusJob {
2357 path: Arc<Path>,
2358 ignore_stack: Arc<IgnoreStack>,
2359 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2360}
2361
2362pub trait WorktreeHandle {
2363 #[cfg(test)]
2364 fn flush_fs_events<'a>(
2365 &self,
2366 cx: &'a gpui::TestAppContext,
2367 ) -> futures::future::LocalBoxFuture<'a, ()>;
2368}
2369
2370impl WorktreeHandle for ModelHandle<Worktree> {
2371 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2372 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2373 // extra directory scans, and emit extra scan-state notifications.
2374 //
2375 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2376 // to ensure that all redundant FS events have already been processed.
2377 #[cfg(test)]
2378 fn flush_fs_events<'a>(
2379 &self,
2380 cx: &'a gpui::TestAppContext,
2381 ) -> futures::future::LocalBoxFuture<'a, ()> {
2382 use smol::future::FutureExt;
2383
2384 let filename = "fs-event-sentinel";
2385 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2386 let tree = self.clone();
2387 async move {
2388 std::fs::write(root_path.join(filename), "").unwrap();
2389 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2390 .await;
2391
2392 std::fs::remove_file(root_path.join(filename)).unwrap();
2393 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2394 .await;
2395
2396 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2397 .await;
2398 }
2399 .boxed_local()
2400 }
2401}
2402
2403pub enum FileIter<'a> {
2404 All(Cursor<'a, Entry, FileCount, ()>),
2405 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2406}
2407
2408impl<'a> FileIter<'a> {
2409 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2410 let mut cursor = snapshot.entries_by_path.cursor();
2411 cursor.seek(&FileCount(start), Bias::Right, &());
2412 Self::All(cursor)
2413 }
2414
2415 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2416 let mut cursor = snapshot.entries_by_path.cursor();
2417 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2418 Self::Visible(cursor)
2419 }
2420
2421 fn next_internal(&mut self) {
2422 match self {
2423 Self::All(cursor) => {
2424 let ix = *cursor.seek_start();
2425 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2426 }
2427 Self::Visible(cursor) => {
2428 let ix = *cursor.seek_start();
2429 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2430 }
2431 }
2432 }
2433
2434 fn item(&self) -> Option<&'a Entry> {
2435 match self {
2436 Self::All(cursor) => cursor.item(),
2437 Self::Visible(cursor) => cursor.item(),
2438 }
2439 }
2440}
2441
2442impl<'a> Iterator for FileIter<'a> {
2443 type Item = &'a Entry;
2444
2445 fn next(&mut self) -> Option<Self::Item> {
2446 if let Some(entry) = self.item() {
2447 self.next_internal();
2448 Some(entry)
2449 } else {
2450 None
2451 }
2452 }
2453}
2454
2455struct ChildEntriesIter<'a> {
2456 parent_path: &'a Path,
2457 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2458}
2459
2460impl<'a> ChildEntriesIter<'a> {
2461 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2462 let mut cursor = snapshot.entries_by_path.cursor();
2463 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2464 Self {
2465 parent_path,
2466 cursor,
2467 }
2468 }
2469}
2470
2471impl<'a> Iterator for ChildEntriesIter<'a> {
2472 type Item = &'a Entry;
2473
2474 fn next(&mut self) -> Option<Self::Item> {
2475 if let Some(item) = self.cursor.item() {
2476 if item.path.starts_with(self.parent_path) {
2477 self.cursor
2478 .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2479 Some(item)
2480 } else {
2481 None
2482 }
2483 } else {
2484 None
2485 }
2486 }
2487}
2488
2489impl<'a> From<&'a Entry> for proto::Entry {
2490 fn from(entry: &'a Entry) -> Self {
2491 Self {
2492 id: entry.id as u64,
2493 is_dir: entry.is_dir(),
2494 path: entry.path.to_string_lossy().to_string(),
2495 inode: entry.inode,
2496 mtime: Some(entry.mtime.into()),
2497 is_symlink: entry.is_symlink,
2498 is_ignored: entry.is_ignored,
2499 }
2500 }
2501}
2502
2503impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2504 type Error = anyhow::Error;
2505
2506 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2507 if let Some(mtime) = entry.mtime {
2508 let kind = if entry.is_dir {
2509 EntryKind::Dir
2510 } else {
2511 let mut char_bag = root_char_bag.clone();
2512 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2513 EntryKind::File(char_bag)
2514 };
2515 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2516 Ok(Entry {
2517 id: entry.id as usize,
2518 kind,
2519 path: path.clone(),
2520 inode: entry.inode,
2521 mtime: mtime.into(),
2522 is_symlink: entry.is_symlink,
2523 is_ignored: entry.is_ignored,
2524 })
2525 } else {
2526 Err(anyhow!(
2527 "missing mtime in remote worktree entry {:?}",
2528 entry.path
2529 ))
2530 }
2531 }
2532}
2533
2534#[cfg(test)]
2535mod tests {
2536 use super::*;
2537 use crate::test::*;
2538 use anyhow::Result;
2539 use fs::RealFs;
2540 use rand::prelude::*;
2541 use serde_json::json;
2542 use std::time::UNIX_EPOCH;
2543 use std::{env, fmt::Write, os::unix, time::SystemTime};
2544
2545 #[gpui::test]
2546 async fn test_populate_and_search(cx: gpui::TestAppContext) {
2547 let dir = temp_tree(json!({
2548 "root": {
2549 "apple": "",
2550 "banana": {
2551 "carrot": {
2552 "date": "",
2553 "endive": "",
2554 }
2555 },
2556 "fennel": {
2557 "grape": "",
2558 }
2559 }
2560 }));
2561
2562 let root_link_path = dir.path().join("root_link");
2563 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2564 unix::fs::symlink(
2565 &dir.path().join("root/fennel"),
2566 &dir.path().join("root/finnochio"),
2567 )
2568 .unwrap();
2569
2570 let tree = Worktree::open_local(
2571 root_link_path,
2572 Default::default(),
2573 Arc::new(RealFs),
2574 &mut cx.to_async(),
2575 )
2576 .await
2577 .unwrap();
2578
2579 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2580 .await;
2581 let snapshots = [cx.read(|cx| {
2582 let tree = tree.read(cx);
2583 assert_eq!(tree.file_count(), 5);
2584 assert_eq!(
2585 tree.inode_for_path("fennel/grape"),
2586 tree.inode_for_path("finnochio/grape")
2587 );
2588 tree.snapshot()
2589 })];
2590 let cancel_flag = Default::default();
2591 let results = cx
2592 .read(|cx| {
2593 match_paths(
2594 &snapshots,
2595 "bna",
2596 false,
2597 false,
2598 10,
2599 &cancel_flag,
2600 cx.background().clone(),
2601 )
2602 })
2603 .await;
2604 assert_eq!(
2605 results
2606 .into_iter()
2607 .map(|result| result.path)
2608 .collect::<Vec<Arc<Path>>>(),
2609 vec![
2610 PathBuf::from("banana/carrot/date").into(),
2611 PathBuf::from("banana/carrot/endive").into(),
2612 ]
2613 );
2614 }
2615
2616 #[gpui::test]
2617 async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2618 let dir = temp_tree(json!({
2619 "root": {
2620 "dir1": {},
2621 "dir2": {
2622 "dir3": {}
2623 }
2624 }
2625 }));
2626 let tree = Worktree::open_local(
2627 dir.path(),
2628 Default::default(),
2629 Arc::new(RealFs),
2630 &mut cx.to_async(),
2631 )
2632 .await
2633 .unwrap();
2634
2635 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2636 .await;
2637 let snapshots = [cx.read(|cx| {
2638 let tree = tree.read(cx);
2639 assert_eq!(tree.file_count(), 0);
2640 tree.snapshot()
2641 })];
2642 let cancel_flag = Default::default();
2643 let results = cx
2644 .read(|cx| {
2645 match_paths(
2646 &snapshots,
2647 "dir",
2648 false,
2649 false,
2650 10,
2651 &cancel_flag,
2652 cx.background().clone(),
2653 )
2654 })
2655 .await;
2656 assert_eq!(
2657 results
2658 .into_iter()
2659 .map(|result| result.path)
2660 .collect::<Vec<Arc<Path>>>(),
2661 vec![]
2662 );
2663 }
2664
2665 #[gpui::test]
2666 async fn test_save_file(mut cx: gpui::TestAppContext) {
2667 let dir = temp_tree(json!({
2668 "file1": "the old contents",
2669 }));
2670 let tree = Worktree::open_local(
2671 dir.path(),
2672 Arc::new(LanguageRegistry::new()),
2673 Arc::new(RealFs),
2674 &mut cx.to_async(),
2675 )
2676 .await
2677 .unwrap();
2678 let buffer = tree
2679 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2680 .await
2681 .unwrap();
2682 let save = buffer.update(&mut cx, |buffer, cx| {
2683 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2684 buffer.save(cx).unwrap()
2685 });
2686 save.await.unwrap();
2687
2688 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2689 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2690 }
2691
2692 #[gpui::test]
2693 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2694 let dir = temp_tree(json!({
2695 "file1": "the old contents",
2696 }));
2697 let file_path = dir.path().join("file1");
2698
2699 let tree = Worktree::open_local(
2700 file_path.clone(),
2701 Arc::new(LanguageRegistry::new()),
2702 Arc::new(RealFs),
2703 &mut cx.to_async(),
2704 )
2705 .await
2706 .unwrap();
2707 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2708 .await;
2709 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2710
2711 let buffer = tree
2712 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2713 .await
2714 .unwrap();
2715 let save = buffer.update(&mut cx, |buffer, cx| {
2716 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2717 buffer.save(cx).unwrap()
2718 });
2719 save.await.unwrap();
2720
2721 let new_text = std::fs::read_to_string(file_path).unwrap();
2722 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2723 }
2724
2725 #[gpui::test]
2726 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2727 let dir = temp_tree(json!({
2728 "a": {
2729 "file1": "",
2730 "file2": "",
2731 "file3": "",
2732 },
2733 "b": {
2734 "c": {
2735 "file4": "",
2736 "file5": "",
2737 }
2738 }
2739 }));
2740
2741 let tree = Worktree::open_local(
2742 dir.path(),
2743 Default::default(),
2744 Arc::new(RealFs),
2745 &mut cx.to_async(),
2746 )
2747 .await
2748 .unwrap();
2749
2750 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2751 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2752 async move { buffer.await.unwrap() }
2753 };
2754 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2755 tree.read_with(cx, |tree, _| {
2756 tree.entry_for_path(path)
2757 .expect(&format!("no entry for path {}", path))
2758 .id
2759 })
2760 };
2761
2762 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2763 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2764 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2765 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2766
2767 let file2_id = id_for_path("a/file2", &cx);
2768 let file3_id = id_for_path("a/file3", &cx);
2769 let file4_id = id_for_path("b/c/file4", &cx);
2770
2771 // Wait for the initial scan.
2772 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2773 .await;
2774
2775 // Create a remote copy of this worktree.
2776 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2777 let worktree_id = 1;
2778 let share_request = tree
2779 .update(&mut cx, |tree, cx| {
2780 tree.as_local().unwrap().share_request(cx)
2781 })
2782 .await;
2783 let remote = Worktree::remote(
2784 proto::OpenWorktreeResponse {
2785 worktree_id,
2786 worktree: share_request.worktree,
2787 replica_id: 1,
2788 peers: Vec::new(),
2789 },
2790 rpc::Client::new(),
2791 Default::default(),
2792 &mut cx.to_async(),
2793 )
2794 .await
2795 .unwrap();
2796
2797 cx.read(|cx| {
2798 assert!(!buffer2.read(cx).is_dirty());
2799 assert!(!buffer3.read(cx).is_dirty());
2800 assert!(!buffer4.read(cx).is_dirty());
2801 assert!(!buffer5.read(cx).is_dirty());
2802 });
2803
2804 // Rename and delete files and directories.
2805 tree.flush_fs_events(&cx).await;
2806 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2807 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2808 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2809 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2810 tree.flush_fs_events(&cx).await;
2811
2812 let expected_paths = vec![
2813 "a",
2814 "a/file1",
2815 "a/file2.new",
2816 "b",
2817 "d",
2818 "d/file3",
2819 "d/file4",
2820 ];
2821
2822 cx.read(|app| {
2823 assert_eq!(
2824 tree.read(app)
2825 .paths()
2826 .map(|p| p.to_str().unwrap())
2827 .collect::<Vec<_>>(),
2828 expected_paths
2829 );
2830
2831 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2832 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2833 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2834
2835 assert_eq!(
2836 buffer2.read(app).file().unwrap().path().as_ref(),
2837 Path::new("a/file2.new")
2838 );
2839 assert_eq!(
2840 buffer3.read(app).file().unwrap().path().as_ref(),
2841 Path::new("d/file3")
2842 );
2843 assert_eq!(
2844 buffer4.read(app).file().unwrap().path().as_ref(),
2845 Path::new("d/file4")
2846 );
2847 assert_eq!(
2848 buffer5.read(app).file().unwrap().path().as_ref(),
2849 Path::new("b/c/file5")
2850 );
2851
2852 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2853 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2854 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2855 assert!(buffer5.read(app).file().unwrap().is_deleted());
2856 });
2857
2858 // Update the remote worktree. Check that it becomes consistent with the
2859 // local worktree.
2860 remote.update(&mut cx, |remote, cx| {
2861 let update_message = tree
2862 .read(cx)
2863 .snapshot()
2864 .build_update(&initial_snapshot, worktree_id);
2865 remote
2866 .as_remote_mut()
2867 .unwrap()
2868 .snapshot
2869 .apply_update(update_message)
2870 .unwrap();
2871
2872 assert_eq!(
2873 remote
2874 .paths()
2875 .map(|p| p.to_str().unwrap())
2876 .collect::<Vec<_>>(),
2877 expected_paths
2878 );
2879 });
2880 }
2881
2882 #[gpui::test]
2883 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2884 let dir = temp_tree(json!({
2885 ".git": {},
2886 ".gitignore": "ignored-dir\n",
2887 "tracked-dir": {
2888 "tracked-file1": "tracked contents",
2889 },
2890 "ignored-dir": {
2891 "ignored-file1": "ignored contents",
2892 }
2893 }));
2894
2895 let tree = Worktree::open_local(
2896 dir.path(),
2897 Default::default(),
2898 Arc::new(RealFs),
2899 &mut cx.to_async(),
2900 )
2901 .await
2902 .unwrap();
2903 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2904 .await;
2905 tree.flush_fs_events(&cx).await;
2906 cx.read(|cx| {
2907 let tree = tree.read(cx);
2908 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2909 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2910 assert_eq!(tracked.is_ignored, false);
2911 assert_eq!(ignored.is_ignored, true);
2912 });
2913
2914 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2915 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2916 tree.flush_fs_events(&cx).await;
2917 cx.read(|cx| {
2918 let tree = tree.read(cx);
2919 let dot_git = tree.entry_for_path(".git").unwrap();
2920 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2921 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2922 assert_eq!(tracked.is_ignored, false);
2923 assert_eq!(ignored.is_ignored, true);
2924 assert_eq!(dot_git.is_ignored, true);
2925 });
2926 }
2927
2928 #[gpui::test(iterations = 100)]
2929 fn test_random(mut rng: StdRng) {
2930 let operations = env::var("OPERATIONS")
2931 .map(|o| o.parse().unwrap())
2932 .unwrap_or(40);
2933 let initial_entries = env::var("INITIAL_ENTRIES")
2934 .map(|o| o.parse().unwrap())
2935 .unwrap_or(20);
2936
2937 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
2938 for _ in 0..initial_entries {
2939 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2940 }
2941 log::info!("Generated initial tree");
2942
2943 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2944 let fs = Arc::new(RealFs);
2945 let next_entry_id = Arc::new(AtomicUsize::new(0));
2946 let mut initial_snapshot = Snapshot {
2947 id: 0,
2948 scan_id: 0,
2949 abs_path: root_dir.path().into(),
2950 entries_by_path: Default::default(),
2951 entries_by_id: Default::default(),
2952 removed_entry_ids: Default::default(),
2953 ignores: Default::default(),
2954 root_name: Default::default(),
2955 root_char_bag: Default::default(),
2956 next_entry_id: next_entry_id.clone(),
2957 };
2958 initial_snapshot.insert_entry(Entry::new(
2959 Path::new("").into(),
2960 &smol::block_on(fs.metadata(root_dir.path()))
2961 .unwrap()
2962 .unwrap(),
2963 &next_entry_id,
2964 Default::default(),
2965 ));
2966 let mut scanner = BackgroundScanner::new(
2967 Arc::new(Mutex::new(initial_snapshot.clone())),
2968 notify_tx,
2969 fs.clone(),
2970 Arc::new(gpui::executor::Background::new()),
2971 );
2972 smol::block_on(scanner.scan_dirs()).unwrap();
2973 scanner.snapshot().check_invariants();
2974
2975 let mut events = Vec::new();
2976 let mut mutations_len = operations;
2977 while mutations_len > 1 {
2978 if !events.is_empty() && rng.gen_bool(0.4) {
2979 let len = rng.gen_range(0..=events.len());
2980 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2981 log::info!("Delivering events: {:#?}", to_deliver);
2982 smol::block_on(scanner.process_events(to_deliver));
2983 scanner.snapshot().check_invariants();
2984 } else {
2985 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2986 mutations_len -= 1;
2987 }
2988 }
2989 log::info!("Quiescing: {:#?}", events);
2990 smol::block_on(scanner.process_events(events));
2991 scanner.snapshot().check_invariants();
2992
2993 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2994 let mut new_scanner = BackgroundScanner::new(
2995 Arc::new(Mutex::new(initial_snapshot)),
2996 notify_tx,
2997 scanner.fs.clone(),
2998 scanner.executor.clone(),
2999 );
3000 smol::block_on(new_scanner.scan_dirs()).unwrap();
3001 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3002 }
3003
3004 fn randomly_mutate_tree(
3005 root_path: &Path,
3006 insertion_probability: f64,
3007 rng: &mut impl Rng,
3008 ) -> Result<Vec<fsevent::Event>> {
3009 let root_path = root_path.canonicalize().unwrap();
3010 let (dirs, files) = read_dir_recursive(root_path.clone());
3011
3012 let mut events = Vec::new();
3013 let mut record_event = |path: PathBuf| {
3014 events.push(fsevent::Event {
3015 event_id: SystemTime::now()
3016 .duration_since(UNIX_EPOCH)
3017 .unwrap()
3018 .as_secs(),
3019 flags: fsevent::StreamFlags::empty(),
3020 path,
3021 });
3022 };
3023
3024 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3025 let path = dirs.choose(rng).unwrap();
3026 let new_path = path.join(gen_name(rng));
3027
3028 if rng.gen() {
3029 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3030 std::fs::create_dir(&new_path)?;
3031 } else {
3032 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3033 std::fs::write(&new_path, "")?;
3034 }
3035 record_event(new_path);
3036 } else if rng.gen_bool(0.05) {
3037 let ignore_dir_path = dirs.choose(rng).unwrap();
3038 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3039
3040 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3041 let files_to_ignore = {
3042 let len = rng.gen_range(0..=subfiles.len());
3043 subfiles.choose_multiple(rng, len)
3044 };
3045 let dirs_to_ignore = {
3046 let len = rng.gen_range(0..subdirs.len());
3047 subdirs.choose_multiple(rng, len)
3048 };
3049
3050 let mut ignore_contents = String::new();
3051 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3052 write!(
3053 ignore_contents,
3054 "{}\n",
3055 path_to_ignore
3056 .strip_prefix(&ignore_dir_path)?
3057 .to_str()
3058 .unwrap()
3059 )
3060 .unwrap();
3061 }
3062 log::info!(
3063 "Creating {:?} with contents:\n{}",
3064 ignore_path.strip_prefix(&root_path)?,
3065 ignore_contents
3066 );
3067 std::fs::write(&ignore_path, ignore_contents).unwrap();
3068 record_event(ignore_path);
3069 } else {
3070 let old_path = {
3071 let file_path = files.choose(rng);
3072 let dir_path = dirs[1..].choose(rng);
3073 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3074 };
3075
3076 let is_rename = rng.gen();
3077 if is_rename {
3078 let new_path_parent = dirs
3079 .iter()
3080 .filter(|d| !d.starts_with(old_path))
3081 .choose(rng)
3082 .unwrap();
3083
3084 let overwrite_existing_dir =
3085 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3086 let new_path = if overwrite_existing_dir {
3087 std::fs::remove_dir_all(&new_path_parent).ok();
3088 new_path_parent.to_path_buf()
3089 } else {
3090 new_path_parent.join(gen_name(rng))
3091 };
3092
3093 log::info!(
3094 "Renaming {:?} to {}{:?}",
3095 old_path.strip_prefix(&root_path)?,
3096 if overwrite_existing_dir {
3097 "overwrite "
3098 } else {
3099 ""
3100 },
3101 new_path.strip_prefix(&root_path)?
3102 );
3103 std::fs::rename(&old_path, &new_path)?;
3104 record_event(old_path.clone());
3105 record_event(new_path);
3106 } else if old_path.is_dir() {
3107 let (dirs, files) = read_dir_recursive(old_path.clone());
3108
3109 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3110 std::fs::remove_dir_all(&old_path).unwrap();
3111 for file in files {
3112 record_event(file);
3113 }
3114 for dir in dirs {
3115 record_event(dir);
3116 }
3117 } else {
3118 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3119 std::fs::remove_file(old_path).unwrap();
3120 record_event(old_path.clone());
3121 }
3122 }
3123
3124 Ok(events)
3125 }
3126
3127 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3128 let child_entries = std::fs::read_dir(&path).unwrap();
3129 let mut dirs = vec![path];
3130 let mut files = Vec::new();
3131 for child_entry in child_entries {
3132 let child_path = child_entry.unwrap().path();
3133 if child_path.is_dir() {
3134 let (child_dirs, child_files) = read_dir_recursive(child_path);
3135 dirs.extend(child_dirs);
3136 files.extend(child_files);
3137 } else {
3138 files.push(child_path);
3139 }
3140 }
3141 (dirs, files)
3142 }
3143
3144 fn gen_name(rng: &mut impl Rng) -> String {
3145 (0..6)
3146 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3147 .map(char::from)
3148 .collect()
3149 }
3150
3151 impl Snapshot {
3152 fn check_invariants(&self) {
3153 let mut files = self.files(0);
3154 let mut visible_files = self.visible_files(0);
3155 for entry in self.entries_by_path.cursor::<(), ()>() {
3156 if entry.is_file() {
3157 assert_eq!(files.next().unwrap().inode, entry.inode);
3158 if !entry.is_ignored {
3159 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3160 }
3161 }
3162 }
3163 assert!(files.next().is_none());
3164 assert!(visible_files.next().is_none());
3165
3166 let mut bfs_paths = Vec::new();
3167 let mut stack = vec![Path::new("")];
3168 while let Some(path) = stack.pop() {
3169 bfs_paths.push(path);
3170 let ix = stack.len();
3171 for child_entry in self.child_entries(path) {
3172 stack.insert(ix, &child_entry.path);
3173 }
3174 }
3175
3176 let dfs_paths = self
3177 .entries_by_path
3178 .cursor::<(), ()>()
3179 .map(|e| e.path.as_ref())
3180 .collect::<Vec<_>>();
3181 assert_eq!(bfs_paths, dfs_paths);
3182
3183 for (ignore_parent_path, _) in &self.ignores {
3184 assert!(self.entry_for_path(ignore_parent_path).is_some());
3185 assert!(self
3186 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3187 .is_some());
3188 }
3189 }
3190
3191 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3192 let mut paths = Vec::new();
3193 for entry in self.entries_by_path.cursor::<(), ()>() {
3194 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3195 }
3196 paths.sort_by(|a, b| a.0.cmp(&b.0));
3197 paths
3198 }
3199 }
3200}