1mod ignore;
2
3use self::ignore::IgnoreStack;
4use crate::{
5 editor::{self, Buffer, History, Operation, Rope},
6 fs::{self, Fs},
7 fuzzy,
8 fuzzy::CharBag,
9 language::{Language, LanguageRegistry},
10 rpc::{self, proto},
11 time::{self, ReplicaId},
12 util::{Bias, TryFutureExt},
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Result};
16use futures::{Stream, StreamExt};
17pub use fuzzy::{match_paths, PathMatch};
18use gpui::{
19 executor,
20 sum_tree::{self, Cursor, Edit, SumTree},
21 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
22 UpgradeModelHandle, WeakModelHandle,
23};
24use lazy_static::lazy_static;
25use parking_lot::Mutex;
26use postage::{
27 prelude::{Sink as _, Stream as _},
28 watch,
29};
30use smol::channel::{self, Sender};
31use std::{
32 cmp::{self, Ordering},
33 collections::HashMap,
34 convert::{TryFrom, TryInto},
35 ffi::{OsStr, OsString},
36 fmt,
37 future::Future,
38 ops::Deref,
39 path::{Path, PathBuf},
40 sync::{
41 atomic::{AtomicUsize, Ordering::SeqCst},
42 Arc,
43 },
44 time::{Duration, SystemTime},
45};
46use zrpc::{PeerId, TypedEnvelope};
47
48lazy_static! {
49 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
50}
51
52#[derive(Clone, Debug)]
53enum ScanState {
54 Idle,
55 Scanning,
56 Err(Arc<anyhow::Error>),
57}
58
59pub enum Worktree {
60 Local(LocalWorktree),
61 Remote(RemoteWorktree),
62}
63
64impl Entity for Worktree {
65 type Event = ();
66
67 fn release(&mut self, cx: &mut MutableAppContext) {
68 let rpc = match self {
69 Self::Local(tree) => tree
70 .share
71 .as_ref()
72 .map(|share| (share.rpc.clone(), share.remote_id)),
73 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
74 };
75
76 if let Some((rpc, worktree_id)) = rpc {
77 cx.spawn(|_| async move {
78 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
79 log::error!("error closing worktree {}: {}", worktree_id, err);
80 }
81 })
82 .detach();
83 }
84 }
85}
86
87impl Worktree {
88 pub async fn open_local(
89 path: impl Into<Arc<Path>>,
90 languages: Arc<LanguageRegistry>,
91 fs: Arc<dyn Fs>,
92 cx: &mut AsyncAppContext,
93 ) -> Result<ModelHandle<Self>> {
94 let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx).await?;
95 tree.update(cx, |tree, cx| {
96 let tree = tree.as_local_mut().unwrap();
97 let abs_path = tree.snapshot.abs_path.clone();
98 let background_snapshot = tree.background_snapshot.clone();
99 let background = cx.background().clone();
100 tree._background_scanner_task = Some(cx.background().spawn(async move {
101 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
102 let scanner =
103 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
104 scanner.run(events).await;
105 }));
106 });
107 Ok(tree)
108 }
109
110 pub async fn open_remote(
111 rpc: Arc<rpc::Client>,
112 id: u64,
113 access_token: String,
114 languages: Arc<LanguageRegistry>,
115 cx: &mut AsyncAppContext,
116 ) -> Result<ModelHandle<Self>> {
117 let response = rpc
118 .request(proto::OpenWorktree {
119 worktree_id: id,
120 access_token,
121 })
122 .await?;
123
124 Worktree::remote(response, rpc, languages, cx).await
125 }
126
127 async fn remote(
128 open_response: proto::OpenWorktreeResponse,
129 rpc: Arc<rpc::Client>,
130 languages: Arc<LanguageRegistry>,
131 cx: &mut AsyncAppContext,
132 ) -> Result<ModelHandle<Self>> {
133 let worktree = open_response
134 .worktree
135 .ok_or_else(|| anyhow!("empty worktree"))?;
136
137 let remote_id = open_response.worktree_id;
138 let replica_id = open_response.replica_id as ReplicaId;
139 let peers = open_response.peers;
140 let root_char_bag: CharBag = worktree
141 .root_name
142 .chars()
143 .map(|c| c.to_ascii_lowercase())
144 .collect();
145 let root_name = worktree.root_name.clone();
146 let (entries_by_path, entries_by_id) = cx
147 .background()
148 .spawn(async move {
149 let mut entries_by_path_edits = Vec::new();
150 let mut entries_by_id_edits = Vec::new();
151 for entry in worktree.entries {
152 match Entry::try_from((&root_char_bag, entry)) {
153 Ok(entry) => {
154 entries_by_id_edits.push(Edit::Insert(PathEntry {
155 id: entry.id,
156 path: entry.path.clone(),
157 scan_id: 0,
158 }));
159 entries_by_path_edits.push(Edit::Insert(entry));
160 }
161 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
162 }
163 }
164
165 let mut entries_by_path = SumTree::new();
166 let mut entries_by_id = SumTree::new();
167 entries_by_path.edit(entries_by_path_edits, &());
168 entries_by_id.edit(entries_by_id_edits, &());
169 (entries_by_path, entries_by_id)
170 })
171 .await;
172
173 let worktree = cx.update(|cx| {
174 cx.add_model(|cx: &mut ModelContext<Worktree>| {
175 let snapshot = Snapshot {
176 id: cx.model_id(),
177 scan_id: 0,
178 abs_path: Path::new("").into(),
179 root_name,
180 root_char_bag,
181 ignores: Default::default(),
182 entries_by_path,
183 entries_by_id,
184 removed_entry_ids: Default::default(),
185 next_entry_id: Default::default(),
186 };
187
188 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
189 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
190
191 cx.background()
192 .spawn(async move {
193 while let Some(update) = updates_rx.recv().await {
194 let mut snapshot = snapshot_tx.borrow().clone();
195 if let Err(error) = snapshot.apply_update(update) {
196 log::error!("error applying worktree update: {}", error);
197 }
198 *snapshot_tx.borrow_mut() = snapshot;
199 }
200 })
201 .detach();
202
203 {
204 let mut snapshot_rx = snapshot_rx.clone();
205 cx.spawn_weak(|this, mut cx| async move {
206 while let Some(_) = snapshot_rx.recv().await {
207 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
208 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
209 } else {
210 break;
211 }
212 }
213 })
214 .detach();
215 }
216
217 let _subscriptions = vec![
218 rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
219 rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
220 rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
221 rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
222 rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
223 ];
224
225 Worktree::Remote(RemoteWorktree {
226 remote_id,
227 replica_id,
228 snapshot,
229 snapshot_rx,
230 updates_tx,
231 rpc: rpc.clone(),
232 open_buffers: Default::default(),
233 peers: peers
234 .into_iter()
235 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
236 .collect(),
237 queued_operations: Default::default(),
238 languages,
239 _subscriptions,
240 })
241 })
242 });
243
244 Ok(worktree)
245 }
246
247 pub fn as_local(&self) -> Option<&LocalWorktree> {
248 if let Worktree::Local(worktree) = self {
249 Some(worktree)
250 } else {
251 None
252 }
253 }
254
255 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
256 if let Worktree::Local(worktree) = self {
257 Some(worktree)
258 } else {
259 None
260 }
261 }
262
263 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
264 if let Worktree::Remote(worktree) = self {
265 Some(worktree)
266 } else {
267 None
268 }
269 }
270
271 pub fn snapshot(&self) -> Snapshot {
272 match self {
273 Worktree::Local(worktree) => worktree.snapshot(),
274 Worktree::Remote(worktree) => worktree.snapshot(),
275 }
276 }
277
278 pub fn replica_id(&self) -> ReplicaId {
279 match self {
280 Worktree::Local(_) => 0,
281 Worktree::Remote(worktree) => worktree.replica_id,
282 }
283 }
284
285 pub fn handle_add_peer(
286 &mut self,
287 envelope: TypedEnvelope<proto::AddPeer>,
288 _: Arc<rpc::Client>,
289 cx: &mut ModelContext<Self>,
290 ) -> Result<()> {
291 match self {
292 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
293 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
294 }
295 }
296
297 pub fn handle_remove_peer(
298 &mut self,
299 envelope: TypedEnvelope<proto::RemovePeer>,
300 _: Arc<rpc::Client>,
301 cx: &mut ModelContext<Self>,
302 ) -> Result<()> {
303 match self {
304 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
305 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
306 }
307 }
308
309 pub fn handle_update(
310 &mut self,
311 envelope: TypedEnvelope<proto::UpdateWorktree>,
312 _: Arc<rpc::Client>,
313 cx: &mut ModelContext<Self>,
314 ) -> anyhow::Result<()> {
315 self.as_remote_mut()
316 .unwrap()
317 .update_from_remote(envelope, cx)
318 }
319
320 pub fn handle_open_buffer(
321 &mut self,
322 envelope: TypedEnvelope<proto::OpenBuffer>,
323 rpc: Arc<rpc::Client>,
324 cx: &mut ModelContext<Self>,
325 ) -> anyhow::Result<()> {
326 let receipt = envelope.receipt();
327
328 let response = self
329 .as_local_mut()
330 .unwrap()
331 .open_remote_buffer(envelope, cx);
332
333 cx.background()
334 .spawn(
335 async move {
336 rpc.respond(receipt, response.await?).await?;
337 Ok(())
338 }
339 .log_err(),
340 )
341 .detach();
342
343 Ok(())
344 }
345
346 pub fn handle_close_buffer(
347 &mut self,
348 envelope: TypedEnvelope<proto::CloseBuffer>,
349 _: Arc<rpc::Client>,
350 cx: &mut ModelContext<Self>,
351 ) -> anyhow::Result<()> {
352 self.as_local_mut()
353 .unwrap()
354 .close_remote_buffer(envelope, cx)
355 }
356
357 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
358 match self {
359 Worktree::Local(worktree) => &worktree.peers,
360 Worktree::Remote(worktree) => &worktree.peers,
361 }
362 }
363
364 pub fn open_buffer(
365 &mut self,
366 path: impl AsRef<Path>,
367 cx: &mut ModelContext<Self>,
368 ) -> Task<Result<ModelHandle<Buffer>>> {
369 match self {
370 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
371 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
372 }
373 }
374
375 #[cfg(feature = "test-support")]
376 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
377 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
378 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
379 Worktree::Remote(worktree) => {
380 Box::new(worktree.open_buffers.values().filter_map(|buf| {
381 if let RemoteBuffer::Loaded(buf) = buf {
382 Some(buf)
383 } else {
384 None
385 }
386 }))
387 }
388 };
389
390 let path = path.as_ref();
391 open_buffers
392 .find(|buffer| {
393 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
394 file.path.as_ref() == path
395 } else {
396 false
397 }
398 })
399 .is_some()
400 }
401
402 pub fn handle_update_buffer(
403 &mut self,
404 envelope: TypedEnvelope<proto::UpdateBuffer>,
405 _: Arc<rpc::Client>,
406 cx: &mut ModelContext<Self>,
407 ) -> Result<()> {
408 let payload = envelope.payload.clone();
409 let buffer_id = payload.buffer_id as usize;
410 let ops = payload
411 .operations
412 .into_iter()
413 .map(|op| op.try_into())
414 .collect::<anyhow::Result<Vec<_>>>()?;
415
416 match self {
417 Worktree::Local(worktree) => {
418 let buffer = worktree
419 .open_buffers
420 .get(&buffer_id)
421 .and_then(|buf| buf.upgrade(cx))
422 .ok_or_else(|| {
423 anyhow!("invalid buffer {} in update buffer message", buffer_id)
424 })?;
425 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
426 }
427 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
428 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
429 Some(RemoteBuffer::Loaded(buffer)) => {
430 if let Some(buffer) = buffer.upgrade(cx) {
431 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
432 } else {
433 worktree
434 .open_buffers
435 .insert(buffer_id, RemoteBuffer::Operations(ops));
436 }
437 }
438 None => {
439 worktree
440 .open_buffers
441 .insert(buffer_id, RemoteBuffer::Operations(ops));
442 }
443 },
444 }
445
446 Ok(())
447 }
448
449 pub fn handle_save_buffer(
450 &mut self,
451 envelope: TypedEnvelope<proto::SaveBuffer>,
452 rpc: Arc<rpc::Client>,
453 cx: &mut ModelContext<Self>,
454 ) -> Result<()> {
455 let sender_id = envelope.original_sender_id()?;
456 let buffer = self
457 .as_local()
458 .unwrap()
459 .shared_buffers
460 .get(&sender_id)
461 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
462 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
463
464 let receipt = envelope.receipt();
465 let worktree_id = envelope.payload.worktree_id;
466 let buffer_id = envelope.payload.buffer_id;
467 let save = cx.spawn(|_, mut cx| async move {
468 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
469 });
470
471 cx.background()
472 .spawn(
473 async move {
474 let (version, mtime) = save.await?;
475
476 rpc.respond(
477 receipt,
478 proto::BufferSaved {
479 worktree_id,
480 buffer_id,
481 version: (&version).into(),
482 mtime: Some(mtime.into()),
483 },
484 )
485 .await?;
486
487 Ok(())
488 }
489 .log_err(),
490 )
491 .detach();
492
493 Ok(())
494 }
495
496 pub fn handle_buffer_saved(
497 &mut self,
498 envelope: TypedEnvelope<proto::BufferSaved>,
499 _: Arc<rpc::Client>,
500 cx: &mut ModelContext<Self>,
501 ) -> Result<()> {
502 let payload = envelope.payload.clone();
503 let worktree = self.as_remote_mut().unwrap();
504 if let Some(buffer) = worktree
505 .open_buffers
506 .get(&(payload.buffer_id as usize))
507 .and_then(|buf| buf.upgrade(cx))
508 {
509 buffer.update(cx, |buffer, cx| {
510 let version = payload.version.try_into()?;
511 let mtime = payload
512 .mtime
513 .ok_or_else(|| anyhow!("missing mtime"))?
514 .into();
515 buffer.did_save(version, mtime, cx);
516 Result::<_, anyhow::Error>::Ok(())
517 })?;
518 }
519 Ok(())
520 }
521
522 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
523 match self {
524 Self::Local(worktree) => {
525 let is_fake_fs = worktree.fs.is_fake();
526 worktree.snapshot = worktree.background_snapshot.lock().clone();
527 if worktree.is_scanning() {
528 if worktree.poll_task.is_none() {
529 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
530 if is_fake_fs {
531 smol::future::yield_now().await;
532 } else {
533 smol::Timer::after(Duration::from_millis(100)).await;
534 }
535 this.update(&mut cx, |this, cx| {
536 this.as_local_mut().unwrap().poll_task = None;
537 this.poll_snapshot(cx);
538 })
539 }));
540 }
541 } else {
542 worktree.poll_task.take();
543 self.update_open_buffers(cx);
544 }
545 }
546 Self::Remote(worktree) => {
547 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
548 self.update_open_buffers(cx);
549 }
550 };
551
552 cx.notify();
553 }
554
555 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
556 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
557 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
558 Self::Remote(worktree) => {
559 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
560 if let RemoteBuffer::Loaded(buf) = buf {
561 Some((id, buf))
562 } else {
563 None
564 }
565 }))
566 }
567 };
568
569 let mut buffers_to_delete = Vec::new();
570 for (buffer_id, buffer) in open_buffers {
571 if let Some(buffer) = buffer.upgrade(cx) {
572 buffer.update(cx, |buffer, cx| {
573 let buffer_is_clean = !buffer.is_dirty();
574
575 if let Some(file) = buffer.file_mut() {
576 let mut file_changed = false;
577
578 if let Some(entry) = file
579 .entry_id
580 .and_then(|entry_id| self.entry_for_id(entry_id))
581 {
582 if entry.path != file.path {
583 file.path = entry.path.clone();
584 file_changed = true;
585 }
586
587 if entry.mtime != file.mtime {
588 file.mtime = entry.mtime;
589 file_changed = true;
590 if let Some(worktree) = self.as_local() {
591 if buffer_is_clean {
592 let abs_path = worktree.absolutize(&file.path);
593 refresh_buffer(abs_path, &worktree.fs, cx);
594 }
595 }
596 }
597 } else if let Some(entry) = self.entry_for_path(&file.path) {
598 file.entry_id = Some(entry.id);
599 file.mtime = entry.mtime;
600 if let Some(worktree) = self.as_local() {
601 if buffer_is_clean {
602 let abs_path = worktree.absolutize(&file.path);
603 refresh_buffer(abs_path, &worktree.fs, cx);
604 }
605 }
606 file_changed = true;
607 } else if !file.is_deleted() {
608 if buffer_is_clean {
609 cx.emit(editor::buffer::Event::Dirtied);
610 }
611 file.entry_id = None;
612 file_changed = true;
613 }
614
615 if file_changed {
616 cx.emit(editor::buffer::Event::FileHandleChanged);
617 }
618 }
619 });
620 } else {
621 buffers_to_delete.push(*buffer_id);
622 }
623 }
624
625 for buffer_id in buffers_to_delete {
626 match self {
627 Self::Local(worktree) => {
628 worktree.open_buffers.remove(&buffer_id);
629 }
630 Self::Remote(worktree) => {
631 worktree.open_buffers.remove(&buffer_id);
632 }
633 }
634 }
635 }
636}
637
638impl Deref for Worktree {
639 type Target = Snapshot;
640
641 fn deref(&self) -> &Self::Target {
642 match self {
643 Worktree::Local(worktree) => &worktree.snapshot,
644 Worktree::Remote(worktree) => &worktree.snapshot,
645 }
646 }
647}
648
649pub struct LocalWorktree {
650 snapshot: Snapshot,
651 background_snapshot: Arc<Mutex<Snapshot>>,
652 last_scan_state_rx: watch::Receiver<ScanState>,
653 _background_scanner_task: Option<Task<()>>,
654 poll_task: Option<Task<()>>,
655 share: Option<ShareState>,
656 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
657 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
658 peers: HashMap<PeerId, ReplicaId>,
659 languages: Arc<LanguageRegistry>,
660 queued_operations: Vec<(u64, Operation)>,
661 fs: Arc<dyn Fs>,
662}
663
664impl LocalWorktree {
665 async fn new(
666 path: impl Into<Arc<Path>>,
667 languages: Arc<LanguageRegistry>,
668 fs: Arc<dyn Fs>,
669 cx: &mut AsyncAppContext,
670 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
671 let abs_path = path.into();
672 let path: Arc<Path> = Arc::from(Path::new(""));
673 let next_entry_id = AtomicUsize::new(0);
674
675 // After determining whether the root entry is a file or a directory, populate the
676 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
677 let root_name = abs_path
678 .file_name()
679 .map_or(String::new(), |f| f.to_string_lossy().to_string());
680 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
681 let metadata = fs.metadata(&abs_path).await?;
682
683 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
684 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
685 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
686 let mut snapshot = Snapshot {
687 id: cx.model_id(),
688 scan_id: 0,
689 abs_path,
690 root_name,
691 root_char_bag,
692 ignores: Default::default(),
693 entries_by_path: Default::default(),
694 entries_by_id: Default::default(),
695 removed_entry_ids: Default::default(),
696 next_entry_id: Arc::new(next_entry_id),
697 };
698 if let Some(metadata) = metadata {
699 snapshot.insert_entry(Entry::new(
700 path.into(),
701 &metadata,
702 &snapshot.next_entry_id,
703 snapshot.root_char_bag,
704 ));
705 }
706
707 let tree = Self {
708 snapshot: snapshot.clone(),
709 background_snapshot: Arc::new(Mutex::new(snapshot)),
710 last_scan_state_rx,
711 _background_scanner_task: None,
712 share: None,
713 poll_task: None,
714 open_buffers: Default::default(),
715 shared_buffers: Default::default(),
716 queued_operations: Default::default(),
717 peers: Default::default(),
718 languages,
719 fs,
720 };
721
722 cx.spawn_weak(|this, mut cx| async move {
723 while let Ok(scan_state) = scan_states_rx.recv().await {
724 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
725 let to_send = handle.update(&mut cx, |this, cx| {
726 last_scan_state_tx.blocking_send(scan_state).ok();
727 this.poll_snapshot(cx);
728 let tree = this.as_local_mut().unwrap();
729 if !tree.is_scanning() {
730 if let Some(share) = tree.share.as_ref() {
731 Some((tree.snapshot(), share.snapshots_tx.clone()))
732 } else {
733 None
734 }
735 } else {
736 None
737 }
738 });
739
740 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
741 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
742 log::error!("error submitting snapshot to send {}", err);
743 }
744 }
745 } else {
746 break;
747 }
748 }
749 })
750 .detach();
751
752 Worktree::Local(tree)
753 });
754
755 Ok((tree, scan_states_tx))
756 }
757
758 pub fn open_buffer(
759 &mut self,
760 path: &Path,
761 cx: &mut ModelContext<Worktree>,
762 ) -> Task<Result<ModelHandle<Buffer>>> {
763 let handle = cx.handle();
764
765 // If there is already a buffer for the given path, then return it.
766 let mut existing_buffer = None;
767 self.open_buffers.retain(|_buffer_id, buffer| {
768 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
769 if let Some(file) = buffer.read(cx.as_ref()).file() {
770 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
771 existing_buffer = Some(buffer);
772 }
773 }
774 true
775 } else {
776 false
777 }
778 });
779
780 let path = Arc::from(path);
781 cx.spawn(|this, mut cx| async move {
782 if let Some(existing_buffer) = existing_buffer {
783 Ok(existing_buffer)
784 } else {
785 let (file, contents) = this
786 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
787 .await?;
788 let buffer = cx.add_model(|cx| {
789 let language = file.select_language(cx);
790 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
791 });
792 this.update(&mut cx, |this, _| {
793 let this = this
794 .as_local_mut()
795 .ok_or_else(|| anyhow!("must be a local worktree"))?;
796 this.open_buffers.insert(buffer.id(), buffer.downgrade());
797 Ok(buffer)
798 })
799 }
800 })
801 }
802
803 pub fn open_remote_buffer(
804 &mut self,
805 envelope: TypedEnvelope<proto::OpenBuffer>,
806 cx: &mut ModelContext<Worktree>,
807 ) -> Task<Result<proto::OpenBufferResponse>> {
808 let peer_id = envelope.original_sender_id();
809 let path = Path::new(&envelope.payload.path);
810
811 let buffer = self.open_buffer(path, cx);
812
813 cx.spawn(|this, mut cx| async move {
814 let buffer = buffer.await?;
815 this.update(&mut cx, |this, cx| {
816 this.as_local_mut()
817 .unwrap()
818 .shared_buffers
819 .entry(peer_id?)
820 .or_default()
821 .insert(buffer.id() as u64, buffer.clone());
822
823 Ok(proto::OpenBufferResponse {
824 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
825 })
826 })
827 })
828 }
829
830 pub fn close_remote_buffer(
831 &mut self,
832 envelope: TypedEnvelope<proto::CloseBuffer>,
833 cx: &mut ModelContext<Worktree>,
834 ) -> Result<()> {
835 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
836 shared_buffers.remove(&envelope.payload.buffer_id);
837 cx.notify();
838 }
839
840 Ok(())
841 }
842
843 pub fn add_peer(
844 &mut self,
845 envelope: TypedEnvelope<proto::AddPeer>,
846 cx: &mut ModelContext<Worktree>,
847 ) -> Result<()> {
848 let peer = envelope
849 .payload
850 .peer
851 .as_ref()
852 .ok_or_else(|| anyhow!("empty peer"))?;
853 self.peers
854 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
855 cx.notify();
856
857 Ok(())
858 }
859
860 pub fn remove_peer(
861 &mut self,
862 envelope: TypedEnvelope<proto::RemovePeer>,
863 cx: &mut ModelContext<Worktree>,
864 ) -> Result<()> {
865 let peer_id = PeerId(envelope.payload.peer_id);
866 let replica_id = self
867 .peers
868 .remove(&peer_id)
869 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
870 self.shared_buffers.remove(&peer_id);
871 for (_, buffer) in &self.open_buffers {
872 if let Some(buffer) = buffer.upgrade(cx) {
873 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
874 }
875 }
876 cx.notify();
877
878 Ok(())
879 }
880
881 pub fn scan_complete(&self) -> impl Future<Output = ()> {
882 let mut scan_state_rx = self.last_scan_state_rx.clone();
883 async move {
884 let mut scan_state = Some(scan_state_rx.borrow().clone());
885 while let Some(ScanState::Scanning) = scan_state {
886 scan_state = scan_state_rx.recv().await;
887 }
888 }
889 }
890
891 fn is_scanning(&self) -> bool {
892 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
893 true
894 } else {
895 false
896 }
897 }
898
899 pub fn snapshot(&self) -> Snapshot {
900 self.snapshot.clone()
901 }
902
903 pub fn abs_path(&self) -> &Path {
904 self.snapshot.abs_path.as_ref()
905 }
906
907 pub fn contains_abs_path(&self, path: &Path) -> bool {
908 path.starts_with(&self.snapshot.abs_path)
909 }
910
911 fn absolutize(&self, path: &Path) -> PathBuf {
912 if path.file_name().is_some() {
913 self.snapshot.abs_path.join(path)
914 } else {
915 self.snapshot.abs_path.to_path_buf()
916 }
917 }
918
919 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
920 let handle = cx.handle();
921 let path = Arc::from(path);
922 let abs_path = self.absolutize(&path);
923 let background_snapshot = self.background_snapshot.clone();
924 let fs = self.fs.clone();
925 cx.spawn(|this, mut cx| async move {
926 let text = fs.load(&abs_path).await?;
927 // Eagerly populate the snapshot with an updated entry for the loaded file
928 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
929 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
930 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
931 })
932 }
933
934 pub fn save_buffer_as(
935 &self,
936 buffer: ModelHandle<Buffer>,
937 path: impl Into<Arc<Path>>,
938 text: Rope,
939 cx: &mut ModelContext<Worktree>,
940 ) -> Task<Result<File>> {
941 let save = self.save(path, text, cx);
942 cx.spawn(|this, mut cx| async move {
943 let entry = save.await?;
944 this.update(&mut cx, |this, cx| {
945 this.as_local_mut()
946 .unwrap()
947 .open_buffers
948 .insert(buffer.id(), buffer.downgrade());
949 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
950 })
951 })
952 }
953
954 fn save(
955 &self,
956 path: impl Into<Arc<Path>>,
957 text: Rope,
958 cx: &mut ModelContext<Worktree>,
959 ) -> Task<Result<Entry>> {
960 let path = path.into();
961 let abs_path = self.absolutize(&path);
962 let background_snapshot = self.background_snapshot.clone();
963 let fs = self.fs.clone();
964 let save = cx.background().spawn(async move {
965 fs.save(&abs_path, &text).await?;
966 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
967 });
968
969 cx.spawn(|this, mut cx| async move {
970 let entry = save.await?;
971 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
972 Ok(entry)
973 })
974 }
975
976 pub fn share(
977 &mut self,
978 rpc: Arc<rpc::Client>,
979 cx: &mut ModelContext<Worktree>,
980 ) -> Task<anyhow::Result<(u64, String)>> {
981 let snapshot = self.snapshot();
982 let share_request = self.share_request(cx);
983 cx.spawn(|this, mut cx| async move {
984 let share_request = share_request.await;
985 let share_response = rpc.request(share_request).await?;
986 let remote_id = share_response.worktree_id;
987
988 log::info!("sharing worktree {:?}", share_response);
989 let (snapshots_to_send_tx, snapshots_to_send_rx) =
990 smol::channel::unbounded::<Snapshot>();
991
992 cx.background()
993 .spawn({
994 let rpc = rpc.clone();
995 async move {
996 let mut prev_snapshot = snapshot;
997 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
998 let message = snapshot.build_update(&prev_snapshot, remote_id);
999 match rpc.send(message).await {
1000 Ok(()) => prev_snapshot = snapshot,
1001 Err(err) => log::error!("error sending snapshot diff {}", err),
1002 }
1003 }
1004 }
1005 })
1006 .detach();
1007
1008 this.update(&mut cx, |worktree, cx| {
1009 let _subscriptions = vec![
1010 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
1011 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
1012 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
1013 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
1014 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
1015 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
1016 ];
1017
1018 let worktree = worktree.as_local_mut().unwrap();
1019 worktree.share = Some(ShareState {
1020 rpc,
1021 remote_id: share_response.worktree_id,
1022 snapshots_tx: snapshots_to_send_tx,
1023 _subscriptions,
1024 });
1025 });
1026
1027 Ok((remote_id, share_response.access_token))
1028 })
1029 }
1030
1031 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
1032 let snapshot = self.snapshot();
1033 let root_name = self.root_name.clone();
1034 cx.background().spawn(async move {
1035 let entries = snapshot
1036 .entries_by_path
1037 .cursor::<(), ()>()
1038 .map(Into::into)
1039 .collect();
1040 proto::ShareWorktree {
1041 worktree: Some(proto::Worktree { root_name, entries }),
1042 }
1043 })
1044 }
1045}
1046
1047pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1048 let fs = fs.clone();
1049 cx.spawn(|buffer, mut cx| async move {
1050 let new_text = fs.load(&abs_path).await;
1051 match new_text {
1052 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1053 Ok(new_text) => {
1054 buffer
1055 .update(&mut cx, |buffer, cx| {
1056 buffer.set_text_from_disk(new_text.into(), cx)
1057 })
1058 .await;
1059 }
1060 }
1061 })
1062 .detach()
1063}
1064
1065impl Deref for LocalWorktree {
1066 type Target = Snapshot;
1067
1068 fn deref(&self) -> &Self::Target {
1069 &self.snapshot
1070 }
1071}
1072
1073impl fmt::Debug for LocalWorktree {
1074 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1075 self.snapshot.fmt(f)
1076 }
1077}
1078
1079struct ShareState {
1080 rpc: Arc<rpc::Client>,
1081 remote_id: u64,
1082 snapshots_tx: Sender<Snapshot>,
1083 _subscriptions: Vec<rpc::Subscription>,
1084}
1085
1086pub struct RemoteWorktree {
1087 remote_id: u64,
1088 snapshot: Snapshot,
1089 snapshot_rx: watch::Receiver<Snapshot>,
1090 rpc: Arc<rpc::Client>,
1091 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1092 replica_id: ReplicaId,
1093 open_buffers: HashMap<usize, RemoteBuffer>,
1094 peers: HashMap<PeerId, ReplicaId>,
1095 languages: Arc<LanguageRegistry>,
1096 queued_operations: Vec<(u64, Operation)>,
1097 _subscriptions: Vec<rpc::Subscription>,
1098}
1099
1100impl RemoteWorktree {
1101 pub fn open_buffer(
1102 &mut self,
1103 path: &Path,
1104 cx: &mut ModelContext<Worktree>,
1105 ) -> Task<Result<ModelHandle<Buffer>>> {
1106 let handle = cx.handle();
1107 let mut existing_buffer = None;
1108 self.open_buffers.retain(|_buffer_id, buffer| {
1109 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1110 if let Some(file) = buffer.read(cx.as_ref()).file() {
1111 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1112 existing_buffer = Some(buffer);
1113 }
1114 }
1115 true
1116 } else {
1117 false
1118 }
1119 });
1120
1121 let rpc = self.rpc.clone();
1122 let replica_id = self.replica_id;
1123 let remote_worktree_id = self.remote_id;
1124 let path = path.to_string_lossy().to_string();
1125 cx.spawn(|this, mut cx| async move {
1126 if let Some(existing_buffer) = existing_buffer {
1127 Ok(existing_buffer)
1128 } else {
1129 let entry = this
1130 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1131 .ok_or_else(|| anyhow!("file does not exist"))?;
1132 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1133 let language = cx.read(|cx| file.select_language(cx));
1134 let response = rpc
1135 .request(proto::OpenBuffer {
1136 worktree_id: remote_worktree_id as u64,
1137 path,
1138 })
1139 .await?;
1140 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1141 let buffer_id = remote_buffer.id as usize;
1142 let buffer = cx.add_model(|cx| {
1143 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1144 });
1145 this.update(&mut cx, |this, cx| {
1146 let this = this.as_remote_mut().unwrap();
1147 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1148 .open_buffers
1149 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1150 {
1151 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1152 }
1153 Result::<_, anyhow::Error>::Ok(())
1154 })?;
1155 Ok(buffer)
1156 }
1157 })
1158 }
1159
1160 fn snapshot(&self) -> Snapshot {
1161 self.snapshot.clone()
1162 }
1163
1164 fn update_from_remote(
1165 &mut self,
1166 envelope: TypedEnvelope<proto::UpdateWorktree>,
1167 cx: &mut ModelContext<Worktree>,
1168 ) -> Result<()> {
1169 let mut tx = self.updates_tx.clone();
1170 let payload = envelope.payload.clone();
1171 cx.background()
1172 .spawn(async move {
1173 tx.send(payload).await.expect("receiver runs to completion");
1174 })
1175 .detach();
1176
1177 Ok(())
1178 }
1179
1180 pub fn add_peer(
1181 &mut self,
1182 envelope: TypedEnvelope<proto::AddPeer>,
1183 cx: &mut ModelContext<Worktree>,
1184 ) -> Result<()> {
1185 let peer = envelope
1186 .payload
1187 .peer
1188 .as_ref()
1189 .ok_or_else(|| anyhow!("empty peer"))?;
1190 self.peers
1191 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1192 cx.notify();
1193 Ok(())
1194 }
1195
1196 pub fn remove_peer(
1197 &mut self,
1198 envelope: TypedEnvelope<proto::RemovePeer>,
1199 cx: &mut ModelContext<Worktree>,
1200 ) -> Result<()> {
1201 let peer_id = PeerId(envelope.payload.peer_id);
1202 let replica_id = self
1203 .peers
1204 .remove(&peer_id)
1205 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1206 for (_, buffer) in &self.open_buffers {
1207 if let Some(buffer) = buffer.upgrade(cx) {
1208 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1209 }
1210 }
1211 cx.notify();
1212 Ok(())
1213 }
1214}
1215
1216enum RemoteBuffer {
1217 Operations(Vec<Operation>),
1218 Loaded(WeakModelHandle<Buffer>),
1219}
1220
1221impl RemoteBuffer {
1222 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1223 match self {
1224 Self::Operations(_) => None,
1225 Self::Loaded(buffer) => buffer.upgrade(cx),
1226 }
1227 }
1228}
1229
1230#[derive(Clone)]
1231pub struct Snapshot {
1232 id: usize,
1233 scan_id: usize,
1234 abs_path: Arc<Path>,
1235 root_name: String,
1236 root_char_bag: CharBag,
1237 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1238 entries_by_path: SumTree<Entry>,
1239 entries_by_id: SumTree<PathEntry>,
1240 removed_entry_ids: HashMap<u64, usize>,
1241 next_entry_id: Arc<AtomicUsize>,
1242}
1243
1244impl Snapshot {
1245 pub fn id(&self) -> usize {
1246 self.id
1247 }
1248
1249 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1250 let mut updated_entries = Vec::new();
1251 let mut removed_entries = Vec::new();
1252 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1253 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1254 loop {
1255 match (self_entries.peek(), other_entries.peek()) {
1256 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1257 Ordering::Less => {
1258 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1259 updated_entries.push(entry);
1260 self_entries.next();
1261 }
1262 Ordering::Equal => {
1263 if self_entry.scan_id != other_entry.scan_id {
1264 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1265 updated_entries.push(entry);
1266 }
1267
1268 self_entries.next();
1269 other_entries.next();
1270 }
1271 Ordering::Greater => {
1272 removed_entries.push(other_entry.id as u64);
1273 other_entries.next();
1274 }
1275 },
1276 (Some(self_entry), None) => {
1277 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1278 updated_entries.push(entry);
1279 self_entries.next();
1280 }
1281 (None, Some(other_entry)) => {
1282 removed_entries.push(other_entry.id as u64);
1283 other_entries.next();
1284 }
1285 (None, None) => break,
1286 }
1287 }
1288
1289 proto::UpdateWorktree {
1290 updated_entries,
1291 removed_entries,
1292 worktree_id,
1293 }
1294 }
1295
1296 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1297 self.scan_id += 1;
1298 let scan_id = self.scan_id;
1299
1300 let mut entries_by_path_edits = Vec::new();
1301 let mut entries_by_id_edits = Vec::new();
1302 for entry_id in update.removed_entries {
1303 let entry_id = entry_id as usize;
1304 let entry = self
1305 .entry_for_id(entry_id)
1306 .ok_or_else(|| anyhow!("unknown entry"))?;
1307 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1308 entries_by_id_edits.push(Edit::Remove(entry.id));
1309 }
1310
1311 for entry in update.updated_entries {
1312 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1313 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1314 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1315 }
1316 entries_by_id_edits.push(Edit::Insert(PathEntry {
1317 id: entry.id,
1318 path: entry.path.clone(),
1319 scan_id,
1320 }));
1321 entries_by_path_edits.push(Edit::Insert(entry));
1322 }
1323
1324 self.entries_by_path.edit(entries_by_path_edits, &());
1325 self.entries_by_id.edit(entries_by_id_edits, &());
1326
1327 Ok(())
1328 }
1329
1330 pub fn file_count(&self) -> usize {
1331 self.entries_by_path.summary().file_count
1332 }
1333
1334 pub fn visible_file_count(&self) -> usize {
1335 self.entries_by_path.summary().visible_file_count
1336 }
1337
1338 pub fn files(&self, start: usize) -> FileIter {
1339 FileIter::all(self, start)
1340 }
1341
1342 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1343 let empty_path = Path::new("");
1344 self.entries_by_path
1345 .cursor::<(), ()>()
1346 .filter(move |entry| entry.path.as_ref() != empty_path)
1347 .map(|entry| &entry.path)
1348 }
1349
1350 pub fn visible_files(&self, start: usize) -> FileIter {
1351 FileIter::visible(self, start)
1352 }
1353
1354 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1355 ChildEntriesIter::new(path, self)
1356 }
1357
1358 pub fn root_entry(&self) -> Option<&Entry> {
1359 self.entry_for_path("")
1360 }
1361
1362 pub fn root_name(&self) -> &str {
1363 &self.root_name
1364 }
1365
1366 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1367 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1368 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1369 cursor.item()
1370 } else {
1371 None
1372 }
1373 }
1374
1375 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1376 let entry = self.entries_by_id.get(&id, &())?;
1377 self.entry_for_path(&entry.path)
1378 }
1379
1380 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1381 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1382 }
1383
1384 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1385 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1386 let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1387 if let Some(err) = err {
1388 log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1389 }
1390
1391 let ignore_dir_path = entry.path.parent().unwrap();
1392 self.ignores
1393 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1394 }
1395
1396 self.reuse_entry_id(&mut entry);
1397 self.entries_by_path.insert_or_replace(entry.clone(), &());
1398 self.entries_by_id.insert_or_replace(
1399 PathEntry {
1400 id: entry.id,
1401 path: entry.path.clone(),
1402 scan_id: self.scan_id,
1403 },
1404 &(),
1405 );
1406 entry
1407 }
1408
1409 fn populate_dir(
1410 &mut self,
1411 parent_path: Arc<Path>,
1412 entries: impl IntoIterator<Item = Entry>,
1413 ignore: Option<Arc<Gitignore>>,
1414 ) {
1415 let mut parent_entry = self
1416 .entries_by_path
1417 .get(&PathKey(parent_path.clone()), &())
1418 .unwrap()
1419 .clone();
1420 if let Some(ignore) = ignore {
1421 self.ignores.insert(parent_path, (ignore, self.scan_id));
1422 }
1423 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1424 parent_entry.kind = EntryKind::Dir;
1425 } else {
1426 unreachable!();
1427 }
1428
1429 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1430 let mut entries_by_id_edits = Vec::new();
1431
1432 for mut entry in entries {
1433 self.reuse_entry_id(&mut entry);
1434 entries_by_id_edits.push(Edit::Insert(PathEntry {
1435 id: entry.id,
1436 path: entry.path.clone(),
1437 scan_id: self.scan_id,
1438 }));
1439 entries_by_path_edits.push(Edit::Insert(entry));
1440 }
1441
1442 self.entries_by_path.edit(entries_by_path_edits, &());
1443 self.entries_by_id.edit(entries_by_id_edits, &());
1444 }
1445
1446 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1447 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1448 entry.id = removed_entry_id;
1449 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1450 entry.id = existing_entry.id;
1451 }
1452 }
1453
1454 fn remove_path(&mut self, path: &Path) {
1455 let mut new_entries;
1456 let removed_entry_ids;
1457 {
1458 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1459 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1460 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1461 new_entries.push_tree(cursor.suffix(&()), &());
1462 }
1463 self.entries_by_path = new_entries;
1464
1465 let mut entries_by_id_edits = Vec::new();
1466 for entry in removed_entry_ids.cursor::<(), ()>() {
1467 let removed_entry_id = self
1468 .removed_entry_ids
1469 .entry(entry.inode)
1470 .or_insert(entry.id);
1471 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1472 entries_by_id_edits.push(Edit::Remove(entry.id));
1473 }
1474 self.entries_by_id.edit(entries_by_id_edits, &());
1475
1476 if path.file_name() == Some(&GITIGNORE) {
1477 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1478 *scan_id = self.scan_id;
1479 }
1480 }
1481 }
1482
1483 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1484 let mut new_ignores = Vec::new();
1485 for ancestor in path.ancestors().skip(1) {
1486 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1487 new_ignores.push((ancestor, Some(ignore.clone())));
1488 } else {
1489 new_ignores.push((ancestor, None));
1490 }
1491 }
1492
1493 let mut ignore_stack = IgnoreStack::none();
1494 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1495 if ignore_stack.is_path_ignored(&parent_path, true) {
1496 ignore_stack = IgnoreStack::all();
1497 break;
1498 } else if let Some(ignore) = ignore {
1499 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1500 }
1501 }
1502
1503 if ignore_stack.is_path_ignored(path, is_dir) {
1504 ignore_stack = IgnoreStack::all();
1505 }
1506
1507 ignore_stack
1508 }
1509}
1510
1511impl fmt::Debug for Snapshot {
1512 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1513 for entry in self.entries_by_path.cursor::<(), ()>() {
1514 for _ in entry.path.ancestors().skip(1) {
1515 write!(f, " ")?;
1516 }
1517 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1518 }
1519 Ok(())
1520 }
1521}
1522
1523#[derive(Clone, PartialEq)]
1524pub struct File {
1525 entry_id: Option<usize>,
1526 worktree: ModelHandle<Worktree>,
1527 pub path: Arc<Path>,
1528 pub mtime: SystemTime,
1529}
1530
1531impl File {
1532 pub fn new(
1533 entry_id: usize,
1534 worktree: ModelHandle<Worktree>,
1535 path: Arc<Path>,
1536 mtime: SystemTime,
1537 ) -> Self {
1538 Self {
1539 entry_id: Some(entry_id),
1540 worktree,
1541 path,
1542 mtime,
1543 }
1544 }
1545
1546 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1547 self.worktree.update(cx, |worktree, cx| {
1548 if let Some((rpc, remote_id)) = match worktree {
1549 Worktree::Local(worktree) => worktree
1550 .share
1551 .as_ref()
1552 .map(|share| (share.rpc.clone(), share.remote_id)),
1553 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1554 } {
1555 cx.spawn(|worktree, mut cx| async move {
1556 if let Err(error) = rpc
1557 .request(proto::UpdateBuffer {
1558 worktree_id: remote_id,
1559 buffer_id,
1560 operations: vec![(&operation).into()],
1561 })
1562 .await
1563 {
1564 worktree.update(&mut cx, |worktree, _| {
1565 log::error!("error sending buffer operation: {}", error);
1566 match worktree {
1567 Worktree::Local(t) => &mut t.queued_operations,
1568 Worktree::Remote(t) => &mut t.queued_operations,
1569 }
1570 .push((buffer_id, operation));
1571 });
1572 }
1573 })
1574 .detach();
1575 }
1576 });
1577 }
1578
1579 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1580 self.worktree.update(cx, |worktree, cx| {
1581 if let Worktree::Remote(worktree) = worktree {
1582 let worktree_id = worktree.remote_id;
1583 let rpc = worktree.rpc.clone();
1584 cx.background()
1585 .spawn(async move {
1586 if let Err(error) = rpc
1587 .send(proto::CloseBuffer {
1588 worktree_id,
1589 buffer_id,
1590 })
1591 .await
1592 {
1593 log::error!("error closing remote buffer: {}", error);
1594 }
1595 })
1596 .detach();
1597 }
1598 });
1599 }
1600
1601 /// Returns this file's path relative to the root of its worktree.
1602 pub fn path(&self) -> Arc<Path> {
1603 self.path.clone()
1604 }
1605
1606 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1607 self.worktree.read(cx).abs_path.join(&self.path)
1608 }
1609
1610 pub fn select_language(&self, cx: &AppContext) -> Option<Arc<Language>> {
1611 let worktree = self.worktree.read(cx);
1612 let mut full_path = PathBuf::new();
1613 full_path.push(worktree.root_name());
1614 full_path.push(&self.path);
1615 let languages = match self.worktree.read(cx) {
1616 Worktree::Local(worktree) => &worktree.languages,
1617 Worktree::Remote(worktree) => &worktree.languages,
1618 };
1619 languages.select_language(&full_path).cloned()
1620 }
1621
1622 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1623 /// of its worktree, then this method will return the name of the worktree itself.
1624 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1625 self.path
1626 .file_name()
1627 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1628 .map(Into::into)
1629 }
1630
1631 pub fn is_deleted(&self) -> bool {
1632 self.entry_id.is_none()
1633 }
1634
1635 pub fn exists(&self) -> bool {
1636 !self.is_deleted()
1637 }
1638
1639 pub fn save(
1640 &self,
1641 buffer_id: u64,
1642 text: Rope,
1643 version: time::Global,
1644 cx: &mut MutableAppContext,
1645 ) -> Task<Result<(time::Global, SystemTime)>> {
1646 self.worktree.update(cx, |worktree, cx| match worktree {
1647 Worktree::Local(worktree) => {
1648 let rpc = worktree
1649 .share
1650 .as_ref()
1651 .map(|share| (share.rpc.clone(), share.remote_id));
1652 let save = worktree.save(self.path.clone(), text, cx);
1653 cx.background().spawn(async move {
1654 let entry = save.await?;
1655 if let Some((rpc, worktree_id)) = rpc {
1656 rpc.send(proto::BufferSaved {
1657 worktree_id,
1658 buffer_id,
1659 version: (&version).into(),
1660 mtime: Some(entry.mtime.into()),
1661 })
1662 .await?;
1663 }
1664 Ok((version, entry.mtime))
1665 })
1666 }
1667 Worktree::Remote(worktree) => {
1668 let rpc = worktree.rpc.clone();
1669 let worktree_id = worktree.remote_id;
1670 cx.foreground().spawn(async move {
1671 let response = rpc
1672 .request(proto::SaveBuffer {
1673 worktree_id,
1674 buffer_id,
1675 })
1676 .await?;
1677 let version = response.version.try_into()?;
1678 let mtime = response
1679 .mtime
1680 .ok_or_else(|| anyhow!("missing mtime"))?
1681 .into();
1682 Ok((version, mtime))
1683 })
1684 }
1685 })
1686 }
1687
1688 pub fn worktree_id(&self) -> usize {
1689 self.worktree.id()
1690 }
1691
1692 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1693 (self.worktree.id(), self.path.clone())
1694 }
1695}
1696
1697#[derive(Clone, Debug)]
1698pub struct Entry {
1699 pub id: usize,
1700 pub kind: EntryKind,
1701 pub path: Arc<Path>,
1702 pub inode: u64,
1703 pub mtime: SystemTime,
1704 pub is_symlink: bool,
1705 pub is_ignored: bool,
1706}
1707
1708#[derive(Clone, Debug)]
1709pub enum EntryKind {
1710 PendingDir,
1711 Dir,
1712 File(CharBag),
1713}
1714
1715impl Entry {
1716 fn new(
1717 path: Arc<Path>,
1718 metadata: &fs::Metadata,
1719 next_entry_id: &AtomicUsize,
1720 root_char_bag: CharBag,
1721 ) -> Self {
1722 Self {
1723 id: next_entry_id.fetch_add(1, SeqCst),
1724 kind: if metadata.is_dir {
1725 EntryKind::PendingDir
1726 } else {
1727 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1728 },
1729 path,
1730 inode: metadata.inode,
1731 mtime: metadata.mtime,
1732 is_symlink: metadata.is_symlink,
1733 is_ignored: false,
1734 }
1735 }
1736
1737 pub fn is_dir(&self) -> bool {
1738 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1739 }
1740
1741 pub fn is_file(&self) -> bool {
1742 matches!(self.kind, EntryKind::File(_))
1743 }
1744}
1745
1746impl sum_tree::Item for Entry {
1747 type Summary = EntrySummary;
1748
1749 fn summary(&self) -> Self::Summary {
1750 let file_count;
1751 let visible_file_count;
1752 if self.is_file() {
1753 file_count = 1;
1754 if self.is_ignored {
1755 visible_file_count = 0;
1756 } else {
1757 visible_file_count = 1;
1758 }
1759 } else {
1760 file_count = 0;
1761 visible_file_count = 0;
1762 }
1763
1764 EntrySummary {
1765 max_path: self.path.clone(),
1766 file_count,
1767 visible_file_count,
1768 }
1769 }
1770}
1771
1772impl sum_tree::KeyedItem for Entry {
1773 type Key = PathKey;
1774
1775 fn key(&self) -> Self::Key {
1776 PathKey(self.path.clone())
1777 }
1778}
1779
1780#[derive(Clone, Debug)]
1781pub struct EntrySummary {
1782 max_path: Arc<Path>,
1783 file_count: usize,
1784 visible_file_count: usize,
1785}
1786
1787impl Default for EntrySummary {
1788 fn default() -> Self {
1789 Self {
1790 max_path: Arc::from(Path::new("")),
1791 file_count: 0,
1792 visible_file_count: 0,
1793 }
1794 }
1795}
1796
1797impl sum_tree::Summary for EntrySummary {
1798 type Context = ();
1799
1800 fn add_summary(&mut self, rhs: &Self, _: &()) {
1801 self.max_path = rhs.max_path.clone();
1802 self.file_count += rhs.file_count;
1803 self.visible_file_count += rhs.visible_file_count;
1804 }
1805}
1806
1807#[derive(Clone, Debug)]
1808struct PathEntry {
1809 id: usize,
1810 path: Arc<Path>,
1811 scan_id: usize,
1812}
1813
1814impl sum_tree::Item for PathEntry {
1815 type Summary = PathEntrySummary;
1816
1817 fn summary(&self) -> Self::Summary {
1818 PathEntrySummary { max_id: self.id }
1819 }
1820}
1821
1822impl sum_tree::KeyedItem for PathEntry {
1823 type Key = usize;
1824
1825 fn key(&self) -> Self::Key {
1826 self.id
1827 }
1828}
1829
1830#[derive(Clone, Debug, Default)]
1831struct PathEntrySummary {
1832 max_id: usize,
1833}
1834
1835impl sum_tree::Summary for PathEntrySummary {
1836 type Context = ();
1837
1838 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1839 self.max_id = summary.max_id;
1840 }
1841}
1842
1843impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1844 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1845 *self = summary.max_id;
1846 }
1847}
1848
1849#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1850pub struct PathKey(Arc<Path>);
1851
1852impl Default for PathKey {
1853 fn default() -> Self {
1854 Self(Path::new("").into())
1855 }
1856}
1857
1858impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1859 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1860 self.0 = summary.max_path.clone();
1861 }
1862}
1863
1864#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1865enum PathSearch<'a> {
1866 Exact(&'a Path),
1867 Successor(&'a Path),
1868}
1869
1870impl<'a> Ord for PathSearch<'a> {
1871 fn cmp(&self, other: &Self) -> cmp::Ordering {
1872 match (self, other) {
1873 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1874 (Self::Successor(a), Self::Exact(b)) => {
1875 if b.starts_with(a) {
1876 cmp::Ordering::Greater
1877 } else {
1878 a.cmp(b)
1879 }
1880 }
1881 _ => unreachable!("not sure we need the other two cases"),
1882 }
1883 }
1884}
1885
1886impl<'a> PartialOrd for PathSearch<'a> {
1887 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1888 Some(self.cmp(other))
1889 }
1890}
1891
1892impl<'a> Default for PathSearch<'a> {
1893 fn default() -> Self {
1894 Self::Exact(Path::new("").into())
1895 }
1896}
1897
1898impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1899 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1900 *self = Self::Exact(summary.max_path.as_ref());
1901 }
1902}
1903
1904#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1905pub struct FileCount(usize);
1906
1907impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1908 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1909 self.0 += summary.file_count;
1910 }
1911}
1912
1913#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1914pub struct VisibleFileCount(usize);
1915
1916impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1917 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1918 self.0 += summary.visible_file_count;
1919 }
1920}
1921
1922struct BackgroundScanner {
1923 fs: Arc<dyn Fs>,
1924 snapshot: Arc<Mutex<Snapshot>>,
1925 notify: Sender<ScanState>,
1926 executor: Arc<executor::Background>,
1927}
1928
1929impl BackgroundScanner {
1930 fn new(
1931 snapshot: Arc<Mutex<Snapshot>>,
1932 notify: Sender<ScanState>,
1933 fs: Arc<dyn Fs>,
1934 executor: Arc<executor::Background>,
1935 ) -> Self {
1936 Self {
1937 fs,
1938 snapshot,
1939 notify,
1940 executor,
1941 }
1942 }
1943
1944 fn abs_path(&self) -> Arc<Path> {
1945 self.snapshot.lock().abs_path.clone()
1946 }
1947
1948 fn snapshot(&self) -> Snapshot {
1949 self.snapshot.lock().clone()
1950 }
1951
1952 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
1953 if self.notify.send(ScanState::Scanning).await.is_err() {
1954 return;
1955 }
1956
1957 if let Err(err) = self.scan_dirs().await {
1958 if self
1959 .notify
1960 .send(ScanState::Err(Arc::new(err)))
1961 .await
1962 .is_err()
1963 {
1964 return;
1965 }
1966 }
1967
1968 if self.notify.send(ScanState::Idle).await.is_err() {
1969 return;
1970 }
1971
1972 futures::pin_mut!(events_rx);
1973 while let Some(events) = events_rx.next().await {
1974 if self.notify.send(ScanState::Scanning).await.is_err() {
1975 break;
1976 }
1977
1978 if !self.process_events(events).await {
1979 break;
1980 }
1981
1982 if self.notify.send(ScanState::Idle).await.is_err() {
1983 break;
1984 }
1985 }
1986 }
1987
1988 async fn scan_dirs(&mut self) -> Result<()> {
1989 let root_char_bag;
1990 let next_entry_id;
1991 let is_dir;
1992 {
1993 let snapshot = self.snapshot.lock();
1994 root_char_bag = snapshot.root_char_bag;
1995 next_entry_id = snapshot.next_entry_id.clone();
1996 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
1997 };
1998
1999 if is_dir {
2000 let path: Arc<Path> = Arc::from(Path::new(""));
2001 let abs_path = self.abs_path();
2002 let (tx, rx) = channel::unbounded();
2003 tx.send(ScanJob {
2004 abs_path: abs_path.to_path_buf(),
2005 path,
2006 ignore_stack: IgnoreStack::none(),
2007 scan_queue: tx.clone(),
2008 })
2009 .await
2010 .unwrap();
2011 drop(tx);
2012
2013 self.executor
2014 .scoped(|scope| {
2015 for _ in 0..self.executor.num_cpus() {
2016 scope.spawn(async {
2017 while let Ok(job) = rx.recv().await {
2018 if let Err(err) = self
2019 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2020 .await
2021 {
2022 log::error!("error scanning {:?}: {}", job.abs_path, err);
2023 }
2024 }
2025 });
2026 }
2027 })
2028 .await;
2029 }
2030
2031 Ok(())
2032 }
2033
2034 async fn scan_dir(
2035 &self,
2036 root_char_bag: CharBag,
2037 next_entry_id: Arc<AtomicUsize>,
2038 job: &ScanJob,
2039 ) -> Result<()> {
2040 let mut new_entries: Vec<Entry> = Vec::new();
2041 let mut new_jobs: Vec<ScanJob> = Vec::new();
2042 let mut ignore_stack = job.ignore_stack.clone();
2043 let mut new_ignore = None;
2044
2045 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2046 while let Some(child_abs_path) = child_paths.next().await {
2047 let child_abs_path = match child_abs_path {
2048 Ok(child_abs_path) => child_abs_path,
2049 Err(error) => {
2050 log::error!("error processing entry {:?}", error);
2051 continue;
2052 }
2053 };
2054 let child_name = child_abs_path.file_name().unwrap();
2055 let child_path: Arc<Path> = job.path.join(child_name).into();
2056 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2057 Some(metadata) => metadata,
2058 None => continue,
2059 };
2060
2061 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2062 if child_name == *GITIGNORE {
2063 let (ignore, err) = Gitignore::new(&child_abs_path);
2064 if let Some(err) = err {
2065 log::error!("error in ignore file {:?} - {:?}", child_name, err);
2066 }
2067 let ignore = Arc::new(ignore);
2068 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2069 new_ignore = Some(ignore);
2070
2071 // Update ignore status of any child entries we've already processed to reflect the
2072 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2073 // there should rarely be too numerous. Update the ignore stack associated with any
2074 // new jobs as well.
2075 let mut new_jobs = new_jobs.iter_mut();
2076 for entry in &mut new_entries {
2077 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2078 if entry.is_dir() {
2079 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2080 IgnoreStack::all()
2081 } else {
2082 ignore_stack.clone()
2083 };
2084 }
2085 }
2086 }
2087
2088 let mut child_entry = Entry::new(
2089 child_path.clone(),
2090 &child_metadata,
2091 &next_entry_id,
2092 root_char_bag,
2093 );
2094
2095 if child_metadata.is_dir {
2096 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2097 child_entry.is_ignored = is_ignored;
2098 new_entries.push(child_entry);
2099 new_jobs.push(ScanJob {
2100 abs_path: child_abs_path,
2101 path: child_path,
2102 ignore_stack: if is_ignored {
2103 IgnoreStack::all()
2104 } else {
2105 ignore_stack.clone()
2106 },
2107 scan_queue: job.scan_queue.clone(),
2108 });
2109 } else {
2110 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2111 new_entries.push(child_entry);
2112 };
2113 }
2114
2115 self.snapshot
2116 .lock()
2117 .populate_dir(job.path.clone(), new_entries, new_ignore);
2118 for new_job in new_jobs {
2119 job.scan_queue.send(new_job).await.unwrap();
2120 }
2121
2122 Ok(())
2123 }
2124
2125 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2126 let mut snapshot = self.snapshot();
2127 snapshot.scan_id += 1;
2128
2129 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2130 abs_path
2131 } else {
2132 return false;
2133 };
2134 let root_char_bag = snapshot.root_char_bag;
2135 let next_entry_id = snapshot.next_entry_id.clone();
2136
2137 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2138 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2139
2140 for event in &events {
2141 match event.path.strip_prefix(&root_abs_path) {
2142 Ok(path) => snapshot.remove_path(&path),
2143 Err(_) => {
2144 log::error!(
2145 "unexpected event {:?} for root path {:?}",
2146 event.path,
2147 root_abs_path
2148 );
2149 continue;
2150 }
2151 }
2152 }
2153
2154 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2155 for event in events {
2156 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2157 Ok(path) => Arc::from(path.to_path_buf()),
2158 Err(_) => {
2159 log::error!(
2160 "unexpected event {:?} for root path {:?}",
2161 event.path,
2162 root_abs_path
2163 );
2164 continue;
2165 }
2166 };
2167
2168 match self.fs.metadata(&event.path).await {
2169 Ok(Some(metadata)) => {
2170 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2171 let mut fs_entry = Entry::new(
2172 path.clone(),
2173 &metadata,
2174 snapshot.next_entry_id.as_ref(),
2175 snapshot.root_char_bag,
2176 );
2177 fs_entry.is_ignored = ignore_stack.is_all();
2178 snapshot.insert_entry(fs_entry);
2179 if metadata.is_dir {
2180 scan_queue_tx
2181 .send(ScanJob {
2182 abs_path: event.path,
2183 path,
2184 ignore_stack,
2185 scan_queue: scan_queue_tx.clone(),
2186 })
2187 .await
2188 .unwrap();
2189 }
2190 }
2191 Ok(None) => {}
2192 Err(err) => {
2193 // TODO - create a special 'error' entry in the entries tree to mark this
2194 log::error!("error reading file on event {:?}", err);
2195 }
2196 }
2197 }
2198
2199 *self.snapshot.lock() = snapshot;
2200
2201 // Scan any directories that were created as part of this event batch.
2202 drop(scan_queue_tx);
2203 self.executor
2204 .scoped(|scope| {
2205 for _ in 0..self.executor.num_cpus() {
2206 scope.spawn(async {
2207 while let Ok(job) = scan_queue_rx.recv().await {
2208 if let Err(err) = self
2209 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2210 .await
2211 {
2212 log::error!("error scanning {:?}: {}", job.abs_path, err);
2213 }
2214 }
2215 });
2216 }
2217 })
2218 .await;
2219
2220 // Attempt to detect renames only over a single batch of file-system events.
2221 self.snapshot.lock().removed_entry_ids.clear();
2222
2223 self.update_ignore_statuses().await;
2224 true
2225 }
2226
2227 async fn update_ignore_statuses(&self) {
2228 let mut snapshot = self.snapshot();
2229
2230 let mut ignores_to_update = Vec::new();
2231 let mut ignores_to_delete = Vec::new();
2232 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2233 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2234 ignores_to_update.push(parent_path.clone());
2235 }
2236
2237 let ignore_path = parent_path.join(&*GITIGNORE);
2238 if snapshot.entry_for_path(ignore_path).is_none() {
2239 ignores_to_delete.push(parent_path.clone());
2240 }
2241 }
2242
2243 for parent_path in ignores_to_delete {
2244 snapshot.ignores.remove(&parent_path);
2245 self.snapshot.lock().ignores.remove(&parent_path);
2246 }
2247
2248 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2249 ignores_to_update.sort_unstable();
2250 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2251 while let Some(parent_path) = ignores_to_update.next() {
2252 while ignores_to_update
2253 .peek()
2254 .map_or(false, |p| p.starts_with(&parent_path))
2255 {
2256 ignores_to_update.next().unwrap();
2257 }
2258
2259 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2260 ignore_queue_tx
2261 .send(UpdateIgnoreStatusJob {
2262 path: parent_path,
2263 ignore_stack,
2264 ignore_queue: ignore_queue_tx.clone(),
2265 })
2266 .await
2267 .unwrap();
2268 }
2269 drop(ignore_queue_tx);
2270
2271 self.executor
2272 .scoped(|scope| {
2273 for _ in 0..self.executor.num_cpus() {
2274 scope.spawn(async {
2275 while let Ok(job) = ignore_queue_rx.recv().await {
2276 self.update_ignore_status(job, &snapshot).await;
2277 }
2278 });
2279 }
2280 })
2281 .await;
2282 }
2283
2284 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2285 let mut ignore_stack = job.ignore_stack;
2286 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2287 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2288 }
2289
2290 let mut edits = Vec::new();
2291 for mut entry in snapshot.child_entries(&job.path).cloned() {
2292 let was_ignored = entry.is_ignored;
2293 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2294 if entry.is_dir() {
2295 let child_ignore_stack = if entry.is_ignored {
2296 IgnoreStack::all()
2297 } else {
2298 ignore_stack.clone()
2299 };
2300 job.ignore_queue
2301 .send(UpdateIgnoreStatusJob {
2302 path: entry.path.clone(),
2303 ignore_stack: child_ignore_stack,
2304 ignore_queue: job.ignore_queue.clone(),
2305 })
2306 .await
2307 .unwrap();
2308 }
2309
2310 if entry.is_ignored != was_ignored {
2311 edits.push(Edit::Insert(entry));
2312 }
2313 }
2314 self.snapshot.lock().entries_by_path.edit(edits, &());
2315 }
2316}
2317
2318async fn refresh_entry(
2319 fs: &dyn Fs,
2320 snapshot: &Mutex<Snapshot>,
2321 path: Arc<Path>,
2322 abs_path: &Path,
2323) -> Result<Entry> {
2324 let root_char_bag;
2325 let next_entry_id;
2326 {
2327 let snapshot = snapshot.lock();
2328 root_char_bag = snapshot.root_char_bag;
2329 next_entry_id = snapshot.next_entry_id.clone();
2330 }
2331 let entry = Entry::new(
2332 path,
2333 &fs.metadata(abs_path)
2334 .await?
2335 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2336 &next_entry_id,
2337 root_char_bag,
2338 );
2339 Ok(snapshot.lock().insert_entry(entry))
2340}
2341
2342fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2343 let mut result = root_char_bag;
2344 result.extend(
2345 path.to_string_lossy()
2346 .chars()
2347 .map(|c| c.to_ascii_lowercase()),
2348 );
2349 result
2350}
2351
2352struct ScanJob {
2353 abs_path: PathBuf,
2354 path: Arc<Path>,
2355 ignore_stack: Arc<IgnoreStack>,
2356 scan_queue: Sender<ScanJob>,
2357}
2358
2359struct UpdateIgnoreStatusJob {
2360 path: Arc<Path>,
2361 ignore_stack: Arc<IgnoreStack>,
2362 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2363}
2364
2365pub trait WorktreeHandle {
2366 #[cfg(test)]
2367 fn flush_fs_events<'a>(
2368 &self,
2369 cx: &'a gpui::TestAppContext,
2370 ) -> futures::future::LocalBoxFuture<'a, ()>;
2371}
2372
2373impl WorktreeHandle for ModelHandle<Worktree> {
2374 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2375 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2376 // extra directory scans, and emit extra scan-state notifications.
2377 //
2378 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2379 // to ensure that all redundant FS events have already been processed.
2380 #[cfg(test)]
2381 fn flush_fs_events<'a>(
2382 &self,
2383 cx: &'a gpui::TestAppContext,
2384 ) -> futures::future::LocalBoxFuture<'a, ()> {
2385 use smol::future::FutureExt;
2386
2387 let filename = "fs-event-sentinel";
2388 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2389 let tree = self.clone();
2390 async move {
2391 std::fs::write(root_path.join(filename), "").unwrap();
2392 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2393 .await;
2394
2395 std::fs::remove_file(root_path.join(filename)).unwrap();
2396 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2397 .await;
2398
2399 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2400 .await;
2401 }
2402 .boxed_local()
2403 }
2404}
2405
2406pub enum FileIter<'a> {
2407 All(Cursor<'a, Entry, FileCount, ()>),
2408 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2409}
2410
2411impl<'a> FileIter<'a> {
2412 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2413 let mut cursor = snapshot.entries_by_path.cursor();
2414 cursor.seek(&FileCount(start), Bias::Right, &());
2415 Self::All(cursor)
2416 }
2417
2418 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2419 let mut cursor = snapshot.entries_by_path.cursor();
2420 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2421 Self::Visible(cursor)
2422 }
2423
2424 fn next_internal(&mut self) {
2425 match self {
2426 Self::All(cursor) => {
2427 let ix = *cursor.seek_start();
2428 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2429 }
2430 Self::Visible(cursor) => {
2431 let ix = *cursor.seek_start();
2432 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2433 }
2434 }
2435 }
2436
2437 fn item(&self) -> Option<&'a Entry> {
2438 match self {
2439 Self::All(cursor) => cursor.item(),
2440 Self::Visible(cursor) => cursor.item(),
2441 }
2442 }
2443}
2444
2445impl<'a> Iterator for FileIter<'a> {
2446 type Item = &'a Entry;
2447
2448 fn next(&mut self) -> Option<Self::Item> {
2449 if let Some(entry) = self.item() {
2450 self.next_internal();
2451 Some(entry)
2452 } else {
2453 None
2454 }
2455 }
2456}
2457
2458struct ChildEntriesIter<'a> {
2459 parent_path: &'a Path,
2460 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2461}
2462
2463impl<'a> ChildEntriesIter<'a> {
2464 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2465 let mut cursor = snapshot.entries_by_path.cursor();
2466 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2467 Self {
2468 parent_path,
2469 cursor,
2470 }
2471 }
2472}
2473
2474impl<'a> Iterator for ChildEntriesIter<'a> {
2475 type Item = &'a Entry;
2476
2477 fn next(&mut self) -> Option<Self::Item> {
2478 if let Some(item) = self.cursor.item() {
2479 if item.path.starts_with(self.parent_path) {
2480 self.cursor
2481 .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2482 Some(item)
2483 } else {
2484 None
2485 }
2486 } else {
2487 None
2488 }
2489 }
2490}
2491
2492impl<'a> From<&'a Entry> for proto::Entry {
2493 fn from(entry: &'a Entry) -> Self {
2494 Self {
2495 id: entry.id as u64,
2496 is_dir: entry.is_dir(),
2497 path: entry.path.to_string_lossy().to_string(),
2498 inode: entry.inode,
2499 mtime: Some(entry.mtime.into()),
2500 is_symlink: entry.is_symlink,
2501 is_ignored: entry.is_ignored,
2502 }
2503 }
2504}
2505
2506impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2507 type Error = anyhow::Error;
2508
2509 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2510 if let Some(mtime) = entry.mtime {
2511 let kind = if entry.is_dir {
2512 EntryKind::Dir
2513 } else {
2514 let mut char_bag = root_char_bag.clone();
2515 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2516 EntryKind::File(char_bag)
2517 };
2518 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2519 Ok(Entry {
2520 id: entry.id as usize,
2521 kind,
2522 path: path.clone(),
2523 inode: entry.inode,
2524 mtime: mtime.into(),
2525 is_symlink: entry.is_symlink,
2526 is_ignored: entry.is_ignored,
2527 })
2528 } else {
2529 Err(anyhow!(
2530 "missing mtime in remote worktree entry {:?}",
2531 entry.path
2532 ))
2533 }
2534 }
2535}
2536
2537#[cfg(test)]
2538mod tests {
2539 use super::*;
2540 use crate::test::*;
2541 use anyhow::Result;
2542 use fs::RealFs;
2543 use rand::prelude::*;
2544 use serde_json::json;
2545 use std::time::UNIX_EPOCH;
2546 use std::{env, fmt::Write, os::unix, time::SystemTime};
2547
2548 #[gpui::test]
2549 async fn test_populate_and_search(cx: gpui::TestAppContext) {
2550 let dir = temp_tree(json!({
2551 "root": {
2552 "apple": "",
2553 "banana": {
2554 "carrot": {
2555 "date": "",
2556 "endive": "",
2557 }
2558 },
2559 "fennel": {
2560 "grape": "",
2561 }
2562 }
2563 }));
2564
2565 let root_link_path = dir.path().join("root_link");
2566 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2567 unix::fs::symlink(
2568 &dir.path().join("root/fennel"),
2569 &dir.path().join("root/finnochio"),
2570 )
2571 .unwrap();
2572
2573 let tree = Worktree::open_local(
2574 root_link_path,
2575 Default::default(),
2576 Arc::new(RealFs),
2577 &mut cx.to_async(),
2578 )
2579 .await
2580 .unwrap();
2581
2582 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2583 .await;
2584 let snapshots = [cx.read(|cx| {
2585 let tree = tree.read(cx);
2586 assert_eq!(tree.file_count(), 5);
2587 assert_eq!(
2588 tree.inode_for_path("fennel/grape"),
2589 tree.inode_for_path("finnochio/grape")
2590 );
2591 tree.snapshot()
2592 })];
2593 let cancel_flag = Default::default();
2594 let results = cx
2595 .read(|cx| {
2596 match_paths(
2597 &snapshots,
2598 "bna",
2599 false,
2600 false,
2601 10,
2602 &cancel_flag,
2603 cx.background().clone(),
2604 )
2605 })
2606 .await;
2607 assert_eq!(
2608 results
2609 .into_iter()
2610 .map(|result| result.path)
2611 .collect::<Vec<Arc<Path>>>(),
2612 vec![
2613 PathBuf::from("banana/carrot/date").into(),
2614 PathBuf::from("banana/carrot/endive").into(),
2615 ]
2616 );
2617 }
2618
2619 #[gpui::test]
2620 async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2621 let dir = temp_tree(json!({
2622 "root": {
2623 "dir1": {},
2624 "dir2": {
2625 "dir3": {}
2626 }
2627 }
2628 }));
2629 let tree = Worktree::open_local(
2630 dir.path(),
2631 Default::default(),
2632 Arc::new(RealFs),
2633 &mut cx.to_async(),
2634 )
2635 .await
2636 .unwrap();
2637
2638 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2639 .await;
2640 let snapshots = [cx.read(|cx| {
2641 let tree = tree.read(cx);
2642 assert_eq!(tree.file_count(), 0);
2643 tree.snapshot()
2644 })];
2645 let cancel_flag = Default::default();
2646 let results = cx
2647 .read(|cx| {
2648 match_paths(
2649 &snapshots,
2650 "dir",
2651 false,
2652 false,
2653 10,
2654 &cancel_flag,
2655 cx.background().clone(),
2656 )
2657 })
2658 .await;
2659 assert_eq!(
2660 results
2661 .into_iter()
2662 .map(|result| result.path)
2663 .collect::<Vec<Arc<Path>>>(),
2664 vec![]
2665 );
2666 }
2667
2668 #[gpui::test]
2669 async fn test_save_file(mut cx: gpui::TestAppContext) {
2670 let dir = temp_tree(json!({
2671 "file1": "the old contents",
2672 }));
2673 let tree = Worktree::open_local(
2674 dir.path(),
2675 Arc::new(LanguageRegistry::new()),
2676 Arc::new(RealFs),
2677 &mut cx.to_async(),
2678 )
2679 .await
2680 .unwrap();
2681 let buffer = tree
2682 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2683 .await
2684 .unwrap();
2685 let save = buffer.update(&mut cx, |buffer, cx| {
2686 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2687 buffer.save(cx).unwrap()
2688 });
2689 save.await.unwrap();
2690
2691 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2692 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2693 }
2694
2695 #[gpui::test]
2696 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2697 let dir = temp_tree(json!({
2698 "file1": "the old contents",
2699 }));
2700 let file_path = dir.path().join("file1");
2701
2702 let tree = Worktree::open_local(
2703 file_path.clone(),
2704 Arc::new(LanguageRegistry::new()),
2705 Arc::new(RealFs),
2706 &mut cx.to_async(),
2707 )
2708 .await
2709 .unwrap();
2710 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2711 .await;
2712 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2713
2714 let buffer = tree
2715 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2716 .await
2717 .unwrap();
2718 let save = buffer.update(&mut cx, |buffer, cx| {
2719 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2720 buffer.save(cx).unwrap()
2721 });
2722 save.await.unwrap();
2723
2724 let new_text = std::fs::read_to_string(file_path).unwrap();
2725 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2726 }
2727
2728 #[gpui::test]
2729 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2730 let dir = temp_tree(json!({
2731 "a": {
2732 "file1": "",
2733 "file2": "",
2734 "file3": "",
2735 },
2736 "b": {
2737 "c": {
2738 "file4": "",
2739 "file5": "",
2740 }
2741 }
2742 }));
2743
2744 let tree = Worktree::open_local(
2745 dir.path(),
2746 Default::default(),
2747 Arc::new(RealFs),
2748 &mut cx.to_async(),
2749 )
2750 .await
2751 .unwrap();
2752
2753 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2754 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2755 async move { buffer.await.unwrap() }
2756 };
2757 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2758 tree.read_with(cx, |tree, _| {
2759 tree.entry_for_path(path)
2760 .expect(&format!("no entry for path {}", path))
2761 .id
2762 })
2763 };
2764
2765 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2766 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2767 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2768 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2769
2770 let file2_id = id_for_path("a/file2", &cx);
2771 let file3_id = id_for_path("a/file3", &cx);
2772 let file4_id = id_for_path("b/c/file4", &cx);
2773
2774 // Wait for the initial scan.
2775 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2776 .await;
2777
2778 // Create a remote copy of this worktree.
2779 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2780 let worktree_id = 1;
2781 let share_request = tree
2782 .update(&mut cx, |tree, cx| {
2783 tree.as_local().unwrap().share_request(cx)
2784 })
2785 .await;
2786 let remote = Worktree::remote(
2787 proto::OpenWorktreeResponse {
2788 worktree_id,
2789 worktree: share_request.worktree,
2790 replica_id: 1,
2791 peers: Vec::new(),
2792 },
2793 rpc::Client::new(),
2794 Default::default(),
2795 &mut cx.to_async(),
2796 )
2797 .await
2798 .unwrap();
2799
2800 cx.read(|cx| {
2801 assert!(!buffer2.read(cx).is_dirty());
2802 assert!(!buffer3.read(cx).is_dirty());
2803 assert!(!buffer4.read(cx).is_dirty());
2804 assert!(!buffer5.read(cx).is_dirty());
2805 });
2806
2807 // Rename and delete files and directories.
2808 tree.flush_fs_events(&cx).await;
2809 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2810 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2811 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2812 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2813 tree.flush_fs_events(&cx).await;
2814
2815 let expected_paths = vec![
2816 "a",
2817 "a/file1",
2818 "a/file2.new",
2819 "b",
2820 "d",
2821 "d/file3",
2822 "d/file4",
2823 ];
2824
2825 cx.read(|app| {
2826 assert_eq!(
2827 tree.read(app)
2828 .paths()
2829 .map(|p| p.to_str().unwrap())
2830 .collect::<Vec<_>>(),
2831 expected_paths
2832 );
2833
2834 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2835 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2836 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2837
2838 assert_eq!(
2839 buffer2.read(app).file().unwrap().path().as_ref(),
2840 Path::new("a/file2.new")
2841 );
2842 assert_eq!(
2843 buffer3.read(app).file().unwrap().path().as_ref(),
2844 Path::new("d/file3")
2845 );
2846 assert_eq!(
2847 buffer4.read(app).file().unwrap().path().as_ref(),
2848 Path::new("d/file4")
2849 );
2850 assert_eq!(
2851 buffer5.read(app).file().unwrap().path().as_ref(),
2852 Path::new("b/c/file5")
2853 );
2854
2855 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2856 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2857 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2858 assert!(buffer5.read(app).file().unwrap().is_deleted());
2859 });
2860
2861 // Update the remote worktree. Check that it becomes consistent with the
2862 // local worktree.
2863 remote.update(&mut cx, |remote, cx| {
2864 let update_message = tree
2865 .read(cx)
2866 .snapshot()
2867 .build_update(&initial_snapshot, worktree_id);
2868 remote
2869 .as_remote_mut()
2870 .unwrap()
2871 .snapshot
2872 .apply_update(update_message)
2873 .unwrap();
2874
2875 assert_eq!(
2876 remote
2877 .paths()
2878 .map(|p| p.to_str().unwrap())
2879 .collect::<Vec<_>>(),
2880 expected_paths
2881 );
2882 });
2883 }
2884
2885 #[gpui::test]
2886 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2887 let dir = temp_tree(json!({
2888 ".git": {},
2889 ".gitignore": "ignored-dir\n",
2890 "tracked-dir": {
2891 "tracked-file1": "tracked contents",
2892 },
2893 "ignored-dir": {
2894 "ignored-file1": "ignored contents",
2895 }
2896 }));
2897
2898 let tree = Worktree::open_local(
2899 dir.path(),
2900 Default::default(),
2901 Arc::new(RealFs),
2902 &mut cx.to_async(),
2903 )
2904 .await
2905 .unwrap();
2906 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2907 .await;
2908 tree.flush_fs_events(&cx).await;
2909 cx.read(|cx| {
2910 let tree = tree.read(cx);
2911 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2912 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2913 assert_eq!(tracked.is_ignored, false);
2914 assert_eq!(ignored.is_ignored, true);
2915 });
2916
2917 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2918 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2919 tree.flush_fs_events(&cx).await;
2920 cx.read(|cx| {
2921 let tree = tree.read(cx);
2922 let dot_git = tree.entry_for_path(".git").unwrap();
2923 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2924 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2925 assert_eq!(tracked.is_ignored, false);
2926 assert_eq!(ignored.is_ignored, true);
2927 assert_eq!(dot_git.is_ignored, true);
2928 });
2929 }
2930
2931 #[gpui::test(iterations = 100)]
2932 fn test_random(mut rng: StdRng) {
2933 let operations = env::var("OPERATIONS")
2934 .map(|o| o.parse().unwrap())
2935 .unwrap_or(40);
2936 let initial_entries = env::var("INITIAL_ENTRIES")
2937 .map(|o| o.parse().unwrap())
2938 .unwrap_or(20);
2939
2940 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
2941 for _ in 0..initial_entries {
2942 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2943 }
2944 log::info!("Generated initial tree");
2945
2946 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2947 let fs = Arc::new(RealFs);
2948 let next_entry_id = Arc::new(AtomicUsize::new(0));
2949 let mut initial_snapshot = Snapshot {
2950 id: 0,
2951 scan_id: 0,
2952 abs_path: root_dir.path().into(),
2953 entries_by_path: Default::default(),
2954 entries_by_id: Default::default(),
2955 removed_entry_ids: Default::default(),
2956 ignores: Default::default(),
2957 root_name: Default::default(),
2958 root_char_bag: Default::default(),
2959 next_entry_id: next_entry_id.clone(),
2960 };
2961 initial_snapshot.insert_entry(Entry::new(
2962 Path::new("").into(),
2963 &smol::block_on(fs.metadata(root_dir.path()))
2964 .unwrap()
2965 .unwrap(),
2966 &next_entry_id,
2967 Default::default(),
2968 ));
2969 let mut scanner = BackgroundScanner::new(
2970 Arc::new(Mutex::new(initial_snapshot.clone())),
2971 notify_tx,
2972 fs.clone(),
2973 Arc::new(gpui::executor::Background::new()),
2974 );
2975 smol::block_on(scanner.scan_dirs()).unwrap();
2976 scanner.snapshot().check_invariants();
2977
2978 let mut events = Vec::new();
2979 let mut mutations_len = operations;
2980 while mutations_len > 1 {
2981 if !events.is_empty() && rng.gen_bool(0.4) {
2982 let len = rng.gen_range(0..=events.len());
2983 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2984 log::info!("Delivering events: {:#?}", to_deliver);
2985 smol::block_on(scanner.process_events(to_deliver));
2986 scanner.snapshot().check_invariants();
2987 } else {
2988 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2989 mutations_len -= 1;
2990 }
2991 }
2992 log::info!("Quiescing: {:#?}", events);
2993 smol::block_on(scanner.process_events(events));
2994 scanner.snapshot().check_invariants();
2995
2996 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2997 let mut new_scanner = BackgroundScanner::new(
2998 Arc::new(Mutex::new(initial_snapshot)),
2999 notify_tx,
3000 scanner.fs.clone(),
3001 scanner.executor.clone(),
3002 );
3003 smol::block_on(new_scanner.scan_dirs()).unwrap();
3004 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3005 }
3006
3007 fn randomly_mutate_tree(
3008 root_path: &Path,
3009 insertion_probability: f64,
3010 rng: &mut impl Rng,
3011 ) -> Result<Vec<fsevent::Event>> {
3012 let root_path = root_path.canonicalize().unwrap();
3013 let (dirs, files) = read_dir_recursive(root_path.clone());
3014
3015 let mut events = Vec::new();
3016 let mut record_event = |path: PathBuf| {
3017 events.push(fsevent::Event {
3018 event_id: SystemTime::now()
3019 .duration_since(UNIX_EPOCH)
3020 .unwrap()
3021 .as_secs(),
3022 flags: fsevent::StreamFlags::empty(),
3023 path,
3024 });
3025 };
3026
3027 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3028 let path = dirs.choose(rng).unwrap();
3029 let new_path = path.join(gen_name(rng));
3030
3031 if rng.gen() {
3032 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3033 std::fs::create_dir(&new_path)?;
3034 } else {
3035 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3036 std::fs::write(&new_path, "")?;
3037 }
3038 record_event(new_path);
3039 } else if rng.gen_bool(0.05) {
3040 let ignore_dir_path = dirs.choose(rng).unwrap();
3041 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3042
3043 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3044 let files_to_ignore = {
3045 let len = rng.gen_range(0..=subfiles.len());
3046 subfiles.choose_multiple(rng, len)
3047 };
3048 let dirs_to_ignore = {
3049 let len = rng.gen_range(0..subdirs.len());
3050 subdirs.choose_multiple(rng, len)
3051 };
3052
3053 let mut ignore_contents = String::new();
3054 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3055 write!(
3056 ignore_contents,
3057 "{}\n",
3058 path_to_ignore
3059 .strip_prefix(&ignore_dir_path)?
3060 .to_str()
3061 .unwrap()
3062 )
3063 .unwrap();
3064 }
3065 log::info!(
3066 "Creating {:?} with contents:\n{}",
3067 ignore_path.strip_prefix(&root_path)?,
3068 ignore_contents
3069 );
3070 std::fs::write(&ignore_path, ignore_contents).unwrap();
3071 record_event(ignore_path);
3072 } else {
3073 let old_path = {
3074 let file_path = files.choose(rng);
3075 let dir_path = dirs[1..].choose(rng);
3076 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3077 };
3078
3079 let is_rename = rng.gen();
3080 if is_rename {
3081 let new_path_parent = dirs
3082 .iter()
3083 .filter(|d| !d.starts_with(old_path))
3084 .choose(rng)
3085 .unwrap();
3086
3087 let overwrite_existing_dir =
3088 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3089 let new_path = if overwrite_existing_dir {
3090 std::fs::remove_dir_all(&new_path_parent).ok();
3091 new_path_parent.to_path_buf()
3092 } else {
3093 new_path_parent.join(gen_name(rng))
3094 };
3095
3096 log::info!(
3097 "Renaming {:?} to {}{:?}",
3098 old_path.strip_prefix(&root_path)?,
3099 if overwrite_existing_dir {
3100 "overwrite "
3101 } else {
3102 ""
3103 },
3104 new_path.strip_prefix(&root_path)?
3105 );
3106 std::fs::rename(&old_path, &new_path)?;
3107 record_event(old_path.clone());
3108 record_event(new_path);
3109 } else if old_path.is_dir() {
3110 let (dirs, files) = read_dir_recursive(old_path.clone());
3111
3112 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3113 std::fs::remove_dir_all(&old_path).unwrap();
3114 for file in files {
3115 record_event(file);
3116 }
3117 for dir in dirs {
3118 record_event(dir);
3119 }
3120 } else {
3121 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3122 std::fs::remove_file(old_path).unwrap();
3123 record_event(old_path.clone());
3124 }
3125 }
3126
3127 Ok(events)
3128 }
3129
3130 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3131 let child_entries = std::fs::read_dir(&path).unwrap();
3132 let mut dirs = vec![path];
3133 let mut files = Vec::new();
3134 for child_entry in child_entries {
3135 let child_path = child_entry.unwrap().path();
3136 if child_path.is_dir() {
3137 let (child_dirs, child_files) = read_dir_recursive(child_path);
3138 dirs.extend(child_dirs);
3139 files.extend(child_files);
3140 } else {
3141 files.push(child_path);
3142 }
3143 }
3144 (dirs, files)
3145 }
3146
3147 fn gen_name(rng: &mut impl Rng) -> String {
3148 (0..6)
3149 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3150 .map(char::from)
3151 .collect()
3152 }
3153
3154 impl Snapshot {
3155 fn check_invariants(&self) {
3156 let mut files = self.files(0);
3157 let mut visible_files = self.visible_files(0);
3158 for entry in self.entries_by_path.cursor::<(), ()>() {
3159 if entry.is_file() {
3160 assert_eq!(files.next().unwrap().inode, entry.inode);
3161 if !entry.is_ignored {
3162 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3163 }
3164 }
3165 }
3166 assert!(files.next().is_none());
3167 assert!(visible_files.next().is_none());
3168
3169 let mut bfs_paths = Vec::new();
3170 let mut stack = vec![Path::new("")];
3171 while let Some(path) = stack.pop() {
3172 bfs_paths.push(path);
3173 let ix = stack.len();
3174 for child_entry in self.child_entries(path) {
3175 stack.insert(ix, &child_entry.path);
3176 }
3177 }
3178
3179 let dfs_paths = self
3180 .entries_by_path
3181 .cursor::<(), ()>()
3182 .map(|e| e.path.as_ref())
3183 .collect::<Vec<_>>();
3184 assert_eq!(bfs_paths, dfs_paths);
3185
3186 for (ignore_parent_path, _) in &self.ignores {
3187 assert!(self.entry_for_path(ignore_parent_path).is_some());
3188 assert!(self
3189 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3190 .is_some());
3191 }
3192 }
3193
3194 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3195 let mut paths = Vec::new();
3196 for entry in self.entries_by_path.cursor::<(), ()>() {
3197 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3198 }
3199 paths.sort_by(|a, b| a.0.cmp(&b.0));
3200 paths
3201 }
3202 }
3203}