1mod ignore;
2
3use self::ignore::IgnoreStack;
4use crate::{
5 editor::{self, Buffer, History, Operation, Rope},
6 fs::{self, Fs},
7 fuzzy,
8 fuzzy::CharBag,
9 language::LanguageRegistry,
10 rpc::{self, proto},
11 sum_tree::{self, Cursor, Edit, SumTree},
12 time::{self, ReplicaId},
13 util::{log_async_errors, Bias},
14};
15use ::ignore::gitignore::Gitignore;
16use anyhow::{anyhow, Result};
17use futures::{Stream, StreamExt};
18pub use fuzzy::{match_paths, PathMatch};
19use gpui::{
20 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
21 Task, UpgradeModelHandle, WeakModelHandle,
22};
23use lazy_static::lazy_static;
24use parking_lot::Mutex;
25use postage::{
26 prelude::{Sink as _, Stream as _},
27 watch,
28};
29use smol::channel::{self, Sender};
30use std::{
31 cmp::{self, Ordering},
32 collections::HashMap,
33 convert::{TryFrom, TryInto},
34 ffi::{OsStr, OsString},
35 fmt,
36 future::Future,
37 ops::Deref,
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicUsize, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, SystemTime},
44};
45use zrpc::{PeerId, TypedEnvelope};
46
47lazy_static! {
48 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
49}
50
51#[derive(Clone, Debug)]
52enum ScanState {
53 Idle,
54 Scanning,
55 Err(Arc<anyhow::Error>),
56}
57
58pub enum Worktree {
59 Local(LocalWorktree),
60 Remote(RemoteWorktree),
61}
62
63impl Entity for Worktree {
64 type Event = ();
65
66 fn release(&mut self, cx: &mut MutableAppContext) {
67 let rpc = match self {
68 Self::Local(tree) => tree
69 .share
70 .as_ref()
71 .map(|share| (share.rpc.clone(), share.remote_id)),
72 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
73 };
74
75 if let Some((rpc, worktree_id)) = rpc {
76 cx.spawn(|_| async move {
77 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
78 log::error!("error closing worktree {}: {}", worktree_id, err);
79 }
80 })
81 .detach();
82 }
83 }
84}
85
86impl Worktree {
87 pub async fn open_local(
88 path: impl Into<Arc<Path>>,
89 languages: Arc<LanguageRegistry>,
90 fs: Arc<dyn Fs>,
91 cx: &mut AsyncAppContext,
92 ) -> Result<ModelHandle<Self>> {
93 let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx).await?;
94 tree.update(cx, |tree, cx| {
95 let tree = tree.as_local_mut().unwrap();
96 let abs_path = tree.snapshot.abs_path.clone();
97 let background_snapshot = tree.background_snapshot.clone();
98 let background = cx.background().clone();
99 tree._background_scanner_task = Some(cx.background().spawn(async move {
100 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
101 let scanner =
102 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
103 scanner.run(events).await;
104 }));
105 });
106 Ok(tree)
107 }
108
109 pub async fn open_remote(
110 rpc: rpc::Client,
111 id: u64,
112 access_token: String,
113 languages: Arc<LanguageRegistry>,
114 cx: &mut AsyncAppContext,
115 ) -> Result<ModelHandle<Self>> {
116 let response = rpc
117 .request(proto::OpenWorktree {
118 worktree_id: id,
119 access_token,
120 })
121 .await?;
122
123 Worktree::remote(response, rpc, languages, cx).await
124 }
125
126 async fn remote(
127 open_response: proto::OpenWorktreeResponse,
128 rpc: rpc::Client,
129 languages: Arc<LanguageRegistry>,
130 cx: &mut AsyncAppContext,
131 ) -> Result<ModelHandle<Self>> {
132 let worktree = open_response
133 .worktree
134 .ok_or_else(|| anyhow!("empty worktree"))?;
135
136 let remote_id = open_response.worktree_id;
137 let replica_id = open_response.replica_id as ReplicaId;
138 let peers = open_response.peers;
139 let root_char_bag: CharBag = worktree
140 .root_name
141 .chars()
142 .map(|c| c.to_ascii_lowercase())
143 .collect();
144 let root_name = worktree.root_name.clone();
145 let (entries_by_path, entries_by_id) = cx
146 .background()
147 .spawn(async move {
148 let mut entries_by_path_edits = Vec::new();
149 let mut entries_by_id_edits = Vec::new();
150 for entry in worktree.entries {
151 match Entry::try_from((&root_char_bag, entry)) {
152 Ok(entry) => {
153 entries_by_id_edits.push(Edit::Insert(PathEntry {
154 id: entry.id,
155 path: entry.path.clone(),
156 scan_id: 0,
157 }));
158 entries_by_path_edits.push(Edit::Insert(entry));
159 }
160 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
161 }
162 }
163
164 let mut entries_by_path = SumTree::new();
165 let mut entries_by_id = SumTree::new();
166 entries_by_path.edit(entries_by_path_edits, &());
167 entries_by_id.edit(entries_by_id_edits, &());
168 (entries_by_path, entries_by_id)
169 })
170 .await;
171
172 let worktree = cx.update(|cx| {
173 cx.add_model(|cx: &mut ModelContext<Worktree>| {
174 let snapshot = Snapshot {
175 id: cx.model_id(),
176 scan_id: 0,
177 abs_path: Path::new("").into(),
178 root_name,
179 root_char_bag,
180 ignores: Default::default(),
181 entries_by_path,
182 entries_by_id,
183 removed_entry_ids: Default::default(),
184 next_entry_id: Default::default(),
185 };
186
187 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
188 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
189
190 cx.background()
191 .spawn(async move {
192 while let Some(update) = updates_rx.recv().await {
193 let mut snapshot = snapshot_tx.borrow().clone();
194 if let Err(error) = snapshot.apply_update(update) {
195 log::error!("error applying worktree update: {}", error);
196 }
197 *snapshot_tx.borrow_mut() = snapshot;
198 }
199 })
200 .detach();
201
202 {
203 let mut snapshot_rx = snapshot_rx.clone();
204 cx.spawn_weak(|this, mut cx| async move {
205 while let Some(_) = snapshot_rx.recv().await {
206 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
207 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
208 } else {
209 break;
210 }
211 }
212 })
213 .detach();
214 }
215
216 let _subscriptions = vec![
217 rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
218 rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
219 rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
220 rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
221 rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
222 ];
223
224 Worktree::Remote(RemoteWorktree {
225 remote_id,
226 replica_id,
227 snapshot,
228 snapshot_rx,
229 updates_tx,
230 rpc: rpc.clone(),
231 open_buffers: Default::default(),
232 peers: peers
233 .into_iter()
234 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
235 .collect(),
236 languages,
237 _subscriptions,
238 })
239 })
240 });
241
242 Ok(worktree)
243 }
244
245 pub fn as_local(&self) -> Option<&LocalWorktree> {
246 if let Worktree::Local(worktree) = self {
247 Some(worktree)
248 } else {
249 None
250 }
251 }
252
253 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
254 if let Worktree::Local(worktree) = self {
255 Some(worktree)
256 } else {
257 None
258 }
259 }
260
261 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
262 if let Worktree::Remote(worktree) = self {
263 Some(worktree)
264 } else {
265 None
266 }
267 }
268
269 pub fn snapshot(&self) -> Snapshot {
270 match self {
271 Worktree::Local(worktree) => worktree.snapshot(),
272 Worktree::Remote(worktree) => worktree.snapshot(),
273 }
274 }
275
276 pub fn replica_id(&self) -> ReplicaId {
277 match self {
278 Worktree::Local(_) => 0,
279 Worktree::Remote(worktree) => worktree.replica_id,
280 }
281 }
282
283 pub fn handle_add_peer(
284 &mut self,
285 envelope: TypedEnvelope<proto::AddPeer>,
286 _: rpc::Client,
287 cx: &mut ModelContext<Self>,
288 ) -> Result<()> {
289 match self {
290 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
291 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
292 }
293 }
294
295 pub fn handle_remove_peer(
296 &mut self,
297 envelope: TypedEnvelope<proto::RemovePeer>,
298 _: rpc::Client,
299 cx: &mut ModelContext<Self>,
300 ) -> Result<()> {
301 match self {
302 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
303 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
304 }
305 }
306
307 pub fn handle_update(
308 &mut self,
309 envelope: TypedEnvelope<proto::UpdateWorktree>,
310 _: rpc::Client,
311 cx: &mut ModelContext<Self>,
312 ) -> anyhow::Result<()> {
313 self.as_remote_mut()
314 .unwrap()
315 .update_from_remote(envelope, cx)
316 }
317
318 pub fn handle_open_buffer(
319 &mut self,
320 envelope: TypedEnvelope<proto::OpenBuffer>,
321 rpc: rpc::Client,
322 cx: &mut ModelContext<Self>,
323 ) -> anyhow::Result<()> {
324 let receipt = envelope.receipt();
325
326 let response = self
327 .as_local_mut()
328 .unwrap()
329 .open_remote_buffer(envelope, cx);
330
331 cx.background()
332 .spawn(log_async_errors(async move {
333 rpc.respond(receipt, response.await?).await?;
334 Ok(())
335 }))
336 .detach();
337
338 Ok(())
339 }
340
341 pub fn handle_close_buffer(
342 &mut self,
343 envelope: TypedEnvelope<proto::CloseBuffer>,
344 _: rpc::Client,
345 cx: &mut ModelContext<Self>,
346 ) -> anyhow::Result<()> {
347 self.as_local_mut()
348 .unwrap()
349 .close_remote_buffer(envelope, cx)
350 }
351
352 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
353 match self {
354 Worktree::Local(worktree) => &worktree.peers,
355 Worktree::Remote(worktree) => &worktree.peers,
356 }
357 }
358
359 pub fn open_buffer(
360 &mut self,
361 path: impl AsRef<Path>,
362 cx: &mut ModelContext<Self>,
363 ) -> Task<Result<ModelHandle<Buffer>>> {
364 match self {
365 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
366 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
367 }
368 }
369
370 #[cfg(feature = "test-support")]
371 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
372 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
373 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
374 Worktree::Remote(worktree) => {
375 Box::new(worktree.open_buffers.values().filter_map(|buf| {
376 if let RemoteBuffer::Loaded(buf) = buf {
377 Some(buf)
378 } else {
379 None
380 }
381 }))
382 }
383 };
384
385 let path = path.as_ref();
386 open_buffers
387 .find(|buffer| {
388 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
389 file.path.as_ref() == path
390 } else {
391 false
392 }
393 })
394 .is_some()
395 }
396
397 pub fn handle_update_buffer(
398 &mut self,
399 envelope: TypedEnvelope<proto::UpdateBuffer>,
400 _: rpc::Client,
401 cx: &mut ModelContext<Self>,
402 ) -> Result<()> {
403 let payload = envelope.payload.clone();
404 let buffer_id = payload.buffer_id as usize;
405 let ops = payload
406 .operations
407 .into_iter()
408 .map(|op| op.try_into())
409 .collect::<anyhow::Result<Vec<_>>>()?;
410
411 match self {
412 Worktree::Local(worktree) => {
413 let buffer = worktree
414 .open_buffers
415 .get(&buffer_id)
416 .and_then(|buf| buf.upgrade(cx))
417 .ok_or_else(|| {
418 anyhow!("invalid buffer {} in update buffer message", buffer_id)
419 })?;
420 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
421 }
422 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
423 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
424 Some(RemoteBuffer::Loaded(buffer)) => {
425 if let Some(buffer) = buffer.upgrade(cx) {
426 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
427 } else {
428 worktree
429 .open_buffers
430 .insert(buffer_id, RemoteBuffer::Operations(ops));
431 }
432 }
433 None => {
434 worktree
435 .open_buffers
436 .insert(buffer_id, RemoteBuffer::Operations(ops));
437 }
438 },
439 }
440
441 Ok(())
442 }
443
444 pub fn handle_save_buffer(
445 &mut self,
446 envelope: TypedEnvelope<proto::SaveBuffer>,
447 rpc: rpc::Client,
448 cx: &mut ModelContext<Self>,
449 ) -> Result<()> {
450 let sender_id = envelope.original_sender_id()?;
451 let buffer = self
452 .as_local()
453 .unwrap()
454 .shared_buffers
455 .get(&sender_id)
456 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
457 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
458
459 let receipt = envelope.receipt();
460 let worktree_id = envelope.payload.worktree_id;
461 let buffer_id = envelope.payload.buffer_id;
462 let save = buffer.update(cx, |buffer, cx| buffer.save(cx))?;
463
464 cx.background()
465 .spawn(log_async_errors(async move {
466 let (version, mtime) = save.await?;
467
468 rpc.respond(
469 receipt,
470 proto::BufferSaved {
471 worktree_id,
472 buffer_id,
473 version: (&version).into(),
474 mtime: Some(mtime.into()),
475 },
476 )
477 .await?;
478
479 Ok(())
480 }))
481 .detach();
482
483 Ok(())
484 }
485
486 pub fn handle_buffer_saved(
487 &mut self,
488 envelope: TypedEnvelope<proto::BufferSaved>,
489 _: rpc::Client,
490 cx: &mut ModelContext<Self>,
491 ) -> Result<()> {
492 let payload = envelope.payload.clone();
493 let worktree = self.as_remote_mut().unwrap();
494 if let Some(buffer) = worktree
495 .open_buffers
496 .get(&(payload.buffer_id as usize))
497 .and_then(|buf| buf.upgrade(cx))
498 {
499 buffer.update(cx, |buffer, cx| {
500 let version = payload.version.try_into()?;
501 let mtime = payload
502 .mtime
503 .ok_or_else(|| anyhow!("missing mtime"))?
504 .into();
505 buffer.did_save(version, mtime, cx);
506 Result::<_, anyhow::Error>::Ok(())
507 })?;
508 }
509 Ok(())
510 }
511
512 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
513 match self {
514 Self::Local(worktree) => {
515 let is_fake_fs = worktree.fs.is_fake();
516 worktree.snapshot = worktree.background_snapshot.lock().clone();
517 if worktree.is_scanning() {
518 if worktree.poll_task.is_none() {
519 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
520 if is_fake_fs {
521 smol::future::yield_now().await;
522 } else {
523 smol::Timer::after(Duration::from_millis(100)).await;
524 }
525 this.update(&mut cx, |this, cx| {
526 this.as_local_mut().unwrap().poll_task = None;
527 this.poll_snapshot(cx);
528 })
529 }));
530 }
531 } else {
532 worktree.poll_task.take();
533 self.update_open_buffers(cx);
534 }
535 }
536 Self::Remote(worktree) => {
537 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
538 self.update_open_buffers(cx);
539 }
540 };
541
542 cx.notify();
543 }
544
545 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
546 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
547 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
548 Self::Remote(worktree) => {
549 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
550 if let RemoteBuffer::Loaded(buf) = buf {
551 Some((id, buf))
552 } else {
553 None
554 }
555 }))
556 }
557 };
558
559 let mut buffers_to_delete = Vec::new();
560 for (buffer_id, buffer) in open_buffers {
561 if let Some(buffer) = buffer.upgrade(cx) {
562 buffer.update(cx, |buffer, cx| {
563 let buffer_is_clean = !buffer.is_dirty();
564
565 if let Some(file) = buffer.file_mut() {
566 let mut file_changed = false;
567
568 if let Some(entry) = file
569 .entry_id
570 .and_then(|entry_id| self.entry_for_id(entry_id))
571 {
572 if entry.path != file.path {
573 file.path = entry.path.clone();
574 file_changed = true;
575 }
576
577 if entry.mtime != file.mtime {
578 file.mtime = entry.mtime;
579 file_changed = true;
580 if let Some(worktree) = self.as_local() {
581 if buffer_is_clean {
582 let abs_path = worktree.absolutize(&file.path);
583 refresh_buffer(abs_path, &worktree.fs, cx);
584 }
585 }
586 }
587 } else if let Some(entry) = self.entry_for_path(&file.path) {
588 file.entry_id = Some(entry.id);
589 file.mtime = entry.mtime;
590 if let Some(worktree) = self.as_local() {
591 if buffer_is_clean {
592 let abs_path = worktree.absolutize(&file.path);
593 refresh_buffer(abs_path, &worktree.fs, cx);
594 }
595 }
596 file_changed = true;
597 } else if !file.is_deleted() {
598 if buffer_is_clean {
599 cx.emit(editor::buffer::Event::Dirtied);
600 }
601 file.entry_id = None;
602 file_changed = true;
603 }
604
605 if file_changed {
606 cx.emit(editor::buffer::Event::FileHandleChanged);
607 }
608 }
609 });
610 } else {
611 buffers_to_delete.push(*buffer_id);
612 }
613 }
614
615 for buffer_id in buffers_to_delete {
616 match self {
617 Self::Local(worktree) => {
618 worktree.open_buffers.remove(&buffer_id);
619 }
620 Self::Remote(worktree) => {
621 worktree.open_buffers.remove(&buffer_id);
622 }
623 }
624 }
625 }
626}
627
628impl Deref for Worktree {
629 type Target = Snapshot;
630
631 fn deref(&self) -> &Self::Target {
632 match self {
633 Worktree::Local(worktree) => &worktree.snapshot,
634 Worktree::Remote(worktree) => &worktree.snapshot,
635 }
636 }
637}
638
639pub struct LocalWorktree {
640 snapshot: Snapshot,
641 background_snapshot: Arc<Mutex<Snapshot>>,
642 last_scan_state_rx: watch::Receiver<ScanState>,
643 _background_scanner_task: Option<Task<()>>,
644 poll_task: Option<Task<()>>,
645 share: Option<ShareState>,
646 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
647 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
648 peers: HashMap<PeerId, ReplicaId>,
649 languages: Arc<LanguageRegistry>,
650 fs: Arc<dyn Fs>,
651}
652
653impl LocalWorktree {
654 async fn new(
655 path: impl Into<Arc<Path>>,
656 languages: Arc<LanguageRegistry>,
657 fs: Arc<dyn Fs>,
658 cx: &mut AsyncAppContext,
659 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
660 let abs_path = path.into();
661 let path: Arc<Path> = Arc::from(Path::new(""));
662 let next_entry_id = AtomicUsize::new(0);
663
664 // After determining whether the root entry is a file or a directory, populate the
665 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
666 let root_name = abs_path
667 .file_name()
668 .map_or(String::new(), |f| f.to_string_lossy().to_string());
669 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
670 let metadata = fs.metadata(&abs_path).await?;
671
672 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
673 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
674 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
675 let mut snapshot = Snapshot {
676 id: cx.model_id(),
677 scan_id: 0,
678 abs_path,
679 root_name,
680 root_char_bag,
681 ignores: Default::default(),
682 entries_by_path: Default::default(),
683 entries_by_id: Default::default(),
684 removed_entry_ids: Default::default(),
685 next_entry_id: Arc::new(next_entry_id),
686 };
687 if let Some(metadata) = metadata {
688 snapshot.insert_entry(Entry::new(
689 path.into(),
690 &metadata,
691 &snapshot.next_entry_id,
692 snapshot.root_char_bag,
693 ));
694 }
695
696 let tree = Self {
697 snapshot: snapshot.clone(),
698 background_snapshot: Arc::new(Mutex::new(snapshot)),
699 last_scan_state_rx,
700 _background_scanner_task: None,
701 share: None,
702 poll_task: None,
703 open_buffers: Default::default(),
704 shared_buffers: Default::default(),
705 peers: Default::default(),
706 languages,
707 fs,
708 };
709
710 cx.spawn_weak(|this, mut cx| async move {
711 while let Ok(scan_state) = scan_states_rx.recv().await {
712 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
713 let to_send = handle.update(&mut cx, |this, cx| {
714 last_scan_state_tx.blocking_send(scan_state).ok();
715 this.poll_snapshot(cx);
716 let tree = this.as_local_mut().unwrap();
717 if !tree.is_scanning() {
718 if let Some(share) = tree.share.as_ref() {
719 Some((tree.snapshot(), share.snapshots_tx.clone()))
720 } else {
721 None
722 }
723 } else {
724 None
725 }
726 });
727
728 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
729 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
730 log::error!("error submitting snapshot to send {}", err);
731 }
732 }
733 } else {
734 break;
735 }
736 }
737 })
738 .detach();
739
740 Worktree::Local(tree)
741 });
742
743 Ok((tree, scan_states_tx))
744 }
745
746 pub fn open_buffer(
747 &mut self,
748 path: &Path,
749 cx: &mut ModelContext<Worktree>,
750 ) -> Task<Result<ModelHandle<Buffer>>> {
751 let handle = cx.handle();
752
753 // If there is already a buffer for the given path, then return it.
754 let mut existing_buffer = None;
755 self.open_buffers.retain(|_buffer_id, buffer| {
756 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
757 if let Some(file) = buffer.read(cx.as_ref()).file() {
758 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
759 existing_buffer = Some(buffer);
760 }
761 }
762 true
763 } else {
764 false
765 }
766 });
767
768 let languages = self.languages.clone();
769 let path = Arc::from(path);
770 cx.spawn(|this, mut cx| async move {
771 if let Some(existing_buffer) = existing_buffer {
772 Ok(existing_buffer)
773 } else {
774 let (file, contents) = this
775 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
776 .await?;
777 let language = languages.select_language(&path).cloned();
778 let buffer = cx.add_model(|cx| {
779 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
780 });
781 this.update(&mut cx, |this, _| {
782 let this = this
783 .as_local_mut()
784 .ok_or_else(|| anyhow!("must be a local worktree"))?;
785 this.open_buffers.insert(buffer.id(), buffer.downgrade());
786 Ok(buffer)
787 })
788 }
789 })
790 }
791
792 pub fn open_remote_buffer(
793 &mut self,
794 envelope: TypedEnvelope<proto::OpenBuffer>,
795 cx: &mut ModelContext<Worktree>,
796 ) -> Task<Result<proto::OpenBufferResponse>> {
797 let peer_id = envelope.original_sender_id();
798 let path = Path::new(&envelope.payload.path);
799
800 let buffer = self.open_buffer(path, cx);
801
802 cx.spawn(|this, mut cx| async move {
803 let buffer = buffer.await?;
804 this.update(&mut cx, |this, cx| {
805 this.as_local_mut()
806 .unwrap()
807 .shared_buffers
808 .entry(peer_id?)
809 .or_default()
810 .insert(buffer.id() as u64, buffer.clone());
811
812 Ok(proto::OpenBufferResponse {
813 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
814 })
815 })
816 })
817 }
818
819 pub fn close_remote_buffer(
820 &mut self,
821 envelope: TypedEnvelope<proto::CloseBuffer>,
822 cx: &mut ModelContext<Worktree>,
823 ) -> Result<()> {
824 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
825 shared_buffers.remove(&envelope.payload.buffer_id);
826 cx.notify();
827 }
828
829 Ok(())
830 }
831
832 pub fn add_peer(
833 &mut self,
834 envelope: TypedEnvelope<proto::AddPeer>,
835 cx: &mut ModelContext<Worktree>,
836 ) -> Result<()> {
837 let peer = envelope
838 .payload
839 .peer
840 .as_ref()
841 .ok_or_else(|| anyhow!("empty peer"))?;
842 self.peers
843 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
844 cx.notify();
845
846 Ok(())
847 }
848
849 pub fn remove_peer(
850 &mut self,
851 envelope: TypedEnvelope<proto::RemovePeer>,
852 cx: &mut ModelContext<Worktree>,
853 ) -> Result<()> {
854 let peer_id = PeerId(envelope.payload.peer_id);
855 let replica_id = self
856 .peers
857 .remove(&peer_id)
858 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
859 self.shared_buffers.remove(&peer_id);
860 for (_, buffer) in &self.open_buffers {
861 if let Some(buffer) = buffer.upgrade(cx) {
862 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
863 }
864 }
865 cx.notify();
866
867 Ok(())
868 }
869
870 pub fn scan_complete(&self) -> impl Future<Output = ()> {
871 let mut scan_state_rx = self.last_scan_state_rx.clone();
872 async move {
873 let mut scan_state = Some(scan_state_rx.borrow().clone());
874 while let Some(ScanState::Scanning) = scan_state {
875 scan_state = scan_state_rx.recv().await;
876 }
877 }
878 }
879
880 fn is_scanning(&self) -> bool {
881 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
882 true
883 } else {
884 false
885 }
886 }
887
888 pub fn snapshot(&self) -> Snapshot {
889 self.snapshot.clone()
890 }
891
892 pub fn abs_path(&self) -> &Path {
893 self.snapshot.abs_path.as_ref()
894 }
895
896 pub fn contains_abs_path(&self, path: &Path) -> bool {
897 path.starts_with(&self.snapshot.abs_path)
898 }
899
900 fn absolutize(&self, path: &Path) -> PathBuf {
901 if path.file_name().is_some() {
902 self.snapshot.abs_path.join(path)
903 } else {
904 self.snapshot.abs_path.to_path_buf()
905 }
906 }
907
908 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
909 let handle = cx.handle();
910 let path = Arc::from(path);
911 let abs_path = self.absolutize(&path);
912 let background_snapshot = self.background_snapshot.clone();
913 let fs = self.fs.clone();
914 cx.spawn(|this, mut cx| async move {
915 let text = fs.load(&abs_path).await?;
916 // Eagerly populate the snapshot with an updated entry for the loaded file
917 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
918 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
919 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
920 })
921 }
922
923 pub fn save_buffer_as(
924 &self,
925 buffer: ModelHandle<Buffer>,
926 path: impl Into<Arc<Path>>,
927 text: Rope,
928 cx: &mut ModelContext<Worktree>,
929 ) -> Task<Result<File>> {
930 let save = self.save(path, text, cx);
931 cx.spawn(|this, mut cx| async move {
932 let entry = save.await?;
933 this.update(&mut cx, |this, cx| {
934 this.as_local_mut()
935 .unwrap()
936 .open_buffers
937 .insert(buffer.id(), buffer.downgrade());
938 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
939 })
940 })
941 }
942
943 fn save(
944 &self,
945 path: impl Into<Arc<Path>>,
946 text: Rope,
947 cx: &mut ModelContext<Worktree>,
948 ) -> Task<Result<Entry>> {
949 let path = path.into();
950 let abs_path = self.absolutize(&path);
951 let background_snapshot = self.background_snapshot.clone();
952 let fs = self.fs.clone();
953 let save = cx.background().spawn(async move {
954 fs.save(&abs_path, &text).await?;
955 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
956 });
957
958 cx.spawn(|this, mut cx| async move {
959 let entry = save.await?;
960 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
961 Ok(entry)
962 })
963 }
964
965 pub fn share(
966 &mut self,
967 rpc: rpc::Client,
968 cx: &mut ModelContext<Worktree>,
969 ) -> Task<anyhow::Result<(u64, String)>> {
970 let snapshot = self.snapshot();
971 let share_request = self.share_request(cx);
972 cx.spawn(|this, mut cx| async move {
973 let share_request = share_request.await;
974 let share_response = rpc.request(share_request).await?;
975 let remote_id = share_response.worktree_id;
976
977 log::info!("sharing worktree {:?}", share_response);
978 let (snapshots_to_send_tx, snapshots_to_send_rx) =
979 smol::channel::unbounded::<Snapshot>();
980
981 cx.background()
982 .spawn({
983 let rpc = rpc.clone();
984 async move {
985 let mut prev_snapshot = snapshot;
986 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
987 let message = snapshot.build_update(&prev_snapshot, remote_id);
988 match rpc.send(message).await {
989 Ok(()) => prev_snapshot = snapshot,
990 Err(err) => log::error!("error sending snapshot diff {}", err),
991 }
992 }
993 }
994 })
995 .detach();
996
997 this.update(&mut cx, |worktree, cx| {
998 let _subscriptions = vec![
999 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
1000 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
1001 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
1002 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
1003 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
1004 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
1005 ];
1006
1007 let worktree = worktree.as_local_mut().unwrap();
1008 worktree.share = Some(ShareState {
1009 rpc,
1010 remote_id: share_response.worktree_id,
1011 snapshots_tx: snapshots_to_send_tx,
1012 _subscriptions,
1013 });
1014 });
1015
1016 Ok((remote_id, share_response.access_token))
1017 })
1018 }
1019
1020 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
1021 let snapshot = self.snapshot();
1022 let root_name = self.root_name.clone();
1023 cx.background().spawn(async move {
1024 let entries = snapshot
1025 .entries_by_path
1026 .cursor::<(), ()>()
1027 .map(Into::into)
1028 .collect();
1029 proto::ShareWorktree {
1030 worktree: Some(proto::Worktree { root_name, entries }),
1031 }
1032 })
1033 }
1034}
1035
1036pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1037 let fs = fs.clone();
1038 cx.spawn(|buffer, mut cx| async move {
1039 let new_text = fs.load(&abs_path).await;
1040 match new_text {
1041 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1042 Ok(new_text) => {
1043 buffer
1044 .update(&mut cx, |buffer, cx| {
1045 buffer.set_text_from_disk(new_text.into(), cx)
1046 })
1047 .await;
1048 }
1049 }
1050 })
1051 .detach()
1052}
1053
1054impl Deref for LocalWorktree {
1055 type Target = Snapshot;
1056
1057 fn deref(&self) -> &Self::Target {
1058 &self.snapshot
1059 }
1060}
1061
1062impl fmt::Debug for LocalWorktree {
1063 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1064 self.snapshot.fmt(f)
1065 }
1066}
1067
1068struct ShareState {
1069 rpc: rpc::Client,
1070 remote_id: u64,
1071 snapshots_tx: Sender<Snapshot>,
1072 _subscriptions: Vec<rpc::Subscription>,
1073}
1074
1075pub struct RemoteWorktree {
1076 remote_id: u64,
1077 snapshot: Snapshot,
1078 snapshot_rx: watch::Receiver<Snapshot>,
1079 rpc: rpc::Client,
1080 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1081 replica_id: ReplicaId,
1082 open_buffers: HashMap<usize, RemoteBuffer>,
1083 peers: HashMap<PeerId, ReplicaId>,
1084 languages: Arc<LanguageRegistry>,
1085 _subscriptions: Vec<rpc::Subscription>,
1086}
1087
1088impl RemoteWorktree {
1089 pub fn open_buffer(
1090 &mut self,
1091 path: &Path,
1092 cx: &mut ModelContext<Worktree>,
1093 ) -> Task<Result<ModelHandle<Buffer>>> {
1094 let handle = cx.handle();
1095 let mut existing_buffer = None;
1096 self.open_buffers.retain(|_buffer_id, buffer| {
1097 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1098 if let Some(file) = buffer.read(cx.as_ref()).file() {
1099 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1100 existing_buffer = Some(buffer);
1101 }
1102 }
1103 true
1104 } else {
1105 false
1106 }
1107 });
1108
1109 let rpc = self.rpc.clone();
1110 let languages = self.languages.clone();
1111 let replica_id = self.replica_id;
1112 let remote_worktree_id = self.remote_id;
1113 let path = path.to_string_lossy().to_string();
1114 cx.spawn(|this, mut cx| async move {
1115 if let Some(existing_buffer) = existing_buffer {
1116 Ok(existing_buffer)
1117 } else {
1118 let entry = this
1119 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1120 .ok_or_else(|| anyhow!("file does not exist"))?;
1121 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1122 let language = languages.select_language(&path).cloned();
1123 let response = rpc
1124 .request(proto::OpenBuffer {
1125 worktree_id: remote_worktree_id as u64,
1126 path,
1127 })
1128 .await?;
1129 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1130 let buffer_id = remote_buffer.id as usize;
1131 let buffer = cx.add_model(|cx| {
1132 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1133 });
1134 this.update(&mut cx, |this, cx| {
1135 let this = this.as_remote_mut().unwrap();
1136 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1137 .open_buffers
1138 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1139 {
1140 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1141 }
1142 Result::<_, anyhow::Error>::Ok(())
1143 })?;
1144 Ok(buffer)
1145 }
1146 })
1147 }
1148
1149 fn snapshot(&self) -> Snapshot {
1150 self.snapshot.clone()
1151 }
1152
1153 fn update_from_remote(
1154 &mut self,
1155 envelope: TypedEnvelope<proto::UpdateWorktree>,
1156 cx: &mut ModelContext<Worktree>,
1157 ) -> Result<()> {
1158 let mut tx = self.updates_tx.clone();
1159 let payload = envelope.payload.clone();
1160 cx.background()
1161 .spawn(async move {
1162 tx.send(payload).await.expect("receiver runs to completion");
1163 })
1164 .detach();
1165
1166 Ok(())
1167 }
1168
1169 pub fn add_peer(
1170 &mut self,
1171 envelope: TypedEnvelope<proto::AddPeer>,
1172 cx: &mut ModelContext<Worktree>,
1173 ) -> Result<()> {
1174 let peer = envelope
1175 .payload
1176 .peer
1177 .as_ref()
1178 .ok_or_else(|| anyhow!("empty peer"))?;
1179 self.peers
1180 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1181 cx.notify();
1182 Ok(())
1183 }
1184
1185 pub fn remove_peer(
1186 &mut self,
1187 envelope: TypedEnvelope<proto::RemovePeer>,
1188 cx: &mut ModelContext<Worktree>,
1189 ) -> Result<()> {
1190 let peer_id = PeerId(envelope.payload.peer_id);
1191 let replica_id = self
1192 .peers
1193 .remove(&peer_id)
1194 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1195 for (_, buffer) in &self.open_buffers {
1196 if let Some(buffer) = buffer.upgrade(cx) {
1197 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1198 }
1199 }
1200 cx.notify();
1201 Ok(())
1202 }
1203}
1204
1205enum RemoteBuffer {
1206 Operations(Vec<Operation>),
1207 Loaded(WeakModelHandle<Buffer>),
1208}
1209
1210impl RemoteBuffer {
1211 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1212 match self {
1213 Self::Operations(_) => None,
1214 Self::Loaded(buffer) => buffer.upgrade(cx),
1215 }
1216 }
1217}
1218
1219#[derive(Clone)]
1220pub struct Snapshot {
1221 id: usize,
1222 scan_id: usize,
1223 abs_path: Arc<Path>,
1224 root_name: String,
1225 root_char_bag: CharBag,
1226 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1227 entries_by_path: SumTree<Entry>,
1228 entries_by_id: SumTree<PathEntry>,
1229 removed_entry_ids: HashMap<u64, usize>,
1230 next_entry_id: Arc<AtomicUsize>,
1231}
1232
1233impl Snapshot {
1234 pub fn id(&self) -> usize {
1235 self.id
1236 }
1237
1238 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1239 let mut updated_entries = Vec::new();
1240 let mut removed_entries = Vec::new();
1241 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1242 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1243 loop {
1244 match (self_entries.peek(), other_entries.peek()) {
1245 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1246 Ordering::Less => {
1247 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1248 updated_entries.push(entry);
1249 self_entries.next();
1250 }
1251 Ordering::Equal => {
1252 if self_entry.scan_id != other_entry.scan_id {
1253 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1254 updated_entries.push(entry);
1255 }
1256
1257 self_entries.next();
1258 other_entries.next();
1259 }
1260 Ordering::Greater => {
1261 removed_entries.push(other_entry.id as u64);
1262 other_entries.next();
1263 }
1264 },
1265 (Some(self_entry), None) => {
1266 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1267 updated_entries.push(entry);
1268 self_entries.next();
1269 }
1270 (None, Some(other_entry)) => {
1271 removed_entries.push(other_entry.id as u64);
1272 other_entries.next();
1273 }
1274 (None, None) => break,
1275 }
1276 }
1277
1278 proto::UpdateWorktree {
1279 updated_entries,
1280 removed_entries,
1281 worktree_id,
1282 }
1283 }
1284
1285 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1286 self.scan_id += 1;
1287 let scan_id = self.scan_id;
1288
1289 let mut entries_by_path_edits = Vec::new();
1290 let mut entries_by_id_edits = Vec::new();
1291 for entry_id in update.removed_entries {
1292 let entry_id = entry_id as usize;
1293 let entry = self
1294 .entry_for_id(entry_id)
1295 .ok_or_else(|| anyhow!("unknown entry"))?;
1296 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1297 entries_by_id_edits.push(Edit::Remove(entry.id));
1298 }
1299
1300 for entry in update.updated_entries {
1301 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1302 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1303 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1304 }
1305 entries_by_id_edits.push(Edit::Insert(PathEntry {
1306 id: entry.id,
1307 path: entry.path.clone(),
1308 scan_id,
1309 }));
1310 entries_by_path_edits.push(Edit::Insert(entry));
1311 }
1312
1313 self.entries_by_path.edit(entries_by_path_edits, &());
1314 self.entries_by_id.edit(entries_by_id_edits, &());
1315
1316 Ok(())
1317 }
1318
1319 pub fn file_count(&self) -> usize {
1320 self.entries_by_path.summary().file_count
1321 }
1322
1323 pub fn visible_file_count(&self) -> usize {
1324 self.entries_by_path.summary().visible_file_count
1325 }
1326
1327 pub fn files(&self, start: usize) -> FileIter {
1328 FileIter::all(self, start)
1329 }
1330
1331 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1332 let empty_path = Path::new("");
1333 self.entries_by_path
1334 .cursor::<(), ()>()
1335 .filter(move |entry| entry.path.as_ref() != empty_path)
1336 .map(|entry| &entry.path)
1337 }
1338
1339 pub fn visible_files(&self, start: usize) -> FileIter {
1340 FileIter::visible(self, start)
1341 }
1342
1343 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1344 ChildEntriesIter::new(path, self)
1345 }
1346
1347 pub fn root_entry(&self) -> Option<&Entry> {
1348 self.entry_for_path("")
1349 }
1350
1351 pub fn root_name(&self) -> &str {
1352 &self.root_name
1353 }
1354
1355 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1356 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1357 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1358 cursor.item()
1359 } else {
1360 None
1361 }
1362 }
1363
1364 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1365 let entry = self.entries_by_id.get(&id, &())?;
1366 self.entry_for_path(&entry.path)
1367 }
1368
1369 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1370 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1371 }
1372
1373 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1374 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1375 let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1376 if let Some(err) = err {
1377 log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1378 }
1379
1380 let ignore_dir_path = entry.path.parent().unwrap();
1381 self.ignores
1382 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1383 }
1384
1385 self.reuse_entry_id(&mut entry);
1386 self.entries_by_path.insert_or_replace(entry.clone(), &());
1387 self.entries_by_id.insert_or_replace(
1388 PathEntry {
1389 id: entry.id,
1390 path: entry.path.clone(),
1391 scan_id: self.scan_id,
1392 },
1393 &(),
1394 );
1395 entry
1396 }
1397
1398 fn populate_dir(
1399 &mut self,
1400 parent_path: Arc<Path>,
1401 entries: impl IntoIterator<Item = Entry>,
1402 ignore: Option<Arc<Gitignore>>,
1403 ) {
1404 let mut parent_entry = self
1405 .entries_by_path
1406 .get(&PathKey(parent_path.clone()), &())
1407 .unwrap()
1408 .clone();
1409 if let Some(ignore) = ignore {
1410 self.ignores.insert(parent_path, (ignore, self.scan_id));
1411 }
1412 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1413 parent_entry.kind = EntryKind::Dir;
1414 } else {
1415 unreachable!();
1416 }
1417
1418 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1419 let mut entries_by_id_edits = Vec::new();
1420
1421 for mut entry in entries {
1422 self.reuse_entry_id(&mut entry);
1423 entries_by_id_edits.push(Edit::Insert(PathEntry {
1424 id: entry.id,
1425 path: entry.path.clone(),
1426 scan_id: self.scan_id,
1427 }));
1428 entries_by_path_edits.push(Edit::Insert(entry));
1429 }
1430
1431 self.entries_by_path.edit(entries_by_path_edits, &());
1432 self.entries_by_id.edit(entries_by_id_edits, &());
1433 }
1434
1435 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1436 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1437 entry.id = removed_entry_id;
1438 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1439 entry.id = existing_entry.id;
1440 }
1441 }
1442
1443 fn remove_path(&mut self, path: &Path) {
1444 let mut new_entries;
1445 let removed_entry_ids;
1446 {
1447 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1448 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1449 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1450 new_entries.push_tree(cursor.suffix(&()), &());
1451 }
1452 self.entries_by_path = new_entries;
1453
1454 let mut entries_by_id_edits = Vec::new();
1455 for entry in removed_entry_ids.cursor::<(), ()>() {
1456 let removed_entry_id = self
1457 .removed_entry_ids
1458 .entry(entry.inode)
1459 .or_insert(entry.id);
1460 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1461 entries_by_id_edits.push(Edit::Remove(entry.id));
1462 }
1463 self.entries_by_id.edit(entries_by_id_edits, &());
1464
1465 if path.file_name() == Some(&GITIGNORE) {
1466 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1467 *scan_id = self.scan_id;
1468 }
1469 }
1470 }
1471
1472 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1473 let mut new_ignores = Vec::new();
1474 for ancestor in path.ancestors().skip(1) {
1475 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1476 new_ignores.push((ancestor, Some(ignore.clone())));
1477 } else {
1478 new_ignores.push((ancestor, None));
1479 }
1480 }
1481
1482 let mut ignore_stack = IgnoreStack::none();
1483 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1484 if ignore_stack.is_path_ignored(&parent_path, true) {
1485 ignore_stack = IgnoreStack::all();
1486 break;
1487 } else if let Some(ignore) = ignore {
1488 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1489 }
1490 }
1491
1492 if ignore_stack.is_path_ignored(path, is_dir) {
1493 ignore_stack = IgnoreStack::all();
1494 }
1495
1496 ignore_stack
1497 }
1498}
1499
1500impl fmt::Debug for Snapshot {
1501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1502 for entry in self.entries_by_path.cursor::<(), ()>() {
1503 for _ in entry.path.ancestors().skip(1) {
1504 write!(f, " ")?;
1505 }
1506 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1507 }
1508 Ok(())
1509 }
1510}
1511
1512#[derive(Clone, PartialEq)]
1513pub struct File {
1514 entry_id: Option<usize>,
1515 worktree: ModelHandle<Worktree>,
1516 pub path: Arc<Path>,
1517 pub mtime: SystemTime,
1518}
1519
1520impl File {
1521 pub fn new(
1522 entry_id: usize,
1523 worktree: ModelHandle<Worktree>,
1524 path: Arc<Path>,
1525 mtime: SystemTime,
1526 ) -> Self {
1527 Self {
1528 entry_id: Some(entry_id),
1529 worktree,
1530 path,
1531 mtime,
1532 }
1533 }
1534
1535 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1536 self.worktree.update(cx, |worktree, cx| {
1537 if let Some((rpc, remote_id)) = match worktree {
1538 Worktree::Local(worktree) => worktree
1539 .share
1540 .as_ref()
1541 .map(|share| (share.rpc.clone(), share.remote_id)),
1542 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1543 } {
1544 cx.spawn(|_, _| async move {
1545 if let Err(error) = rpc
1546 .send(proto::UpdateBuffer {
1547 worktree_id: remote_id,
1548 buffer_id,
1549 operations: Some(operation).iter().map(Into::into).collect(),
1550 })
1551 .await
1552 {
1553 log::error!("error sending buffer operation: {}", error);
1554 }
1555 })
1556 .detach();
1557 }
1558 });
1559 }
1560
1561 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1562 self.worktree.update(cx, |worktree, cx| {
1563 if let Worktree::Remote(worktree) = worktree {
1564 let worktree_id = worktree.remote_id;
1565 let rpc = worktree.rpc.clone();
1566 cx.background()
1567 .spawn(async move {
1568 if let Err(error) = rpc
1569 .send(proto::CloseBuffer {
1570 worktree_id,
1571 buffer_id,
1572 })
1573 .await
1574 {
1575 log::error!("error closing remote buffer: {}", error);
1576 };
1577 })
1578 .detach();
1579 }
1580 });
1581 }
1582
1583 /// Returns this file's path relative to the root of its worktree.
1584 pub fn path(&self) -> Arc<Path> {
1585 self.path.clone()
1586 }
1587
1588 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1589 self.worktree.read(cx).abs_path.join(&self.path)
1590 }
1591
1592 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1593 /// of its worktree, then this method will return the name of the worktree itself.
1594 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1595 self.path
1596 .file_name()
1597 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1598 .map(Into::into)
1599 }
1600
1601 pub fn is_deleted(&self) -> bool {
1602 self.entry_id.is_none()
1603 }
1604
1605 pub fn exists(&self) -> bool {
1606 !self.is_deleted()
1607 }
1608
1609 pub fn save(
1610 &self,
1611 buffer_id: u64,
1612 text: Rope,
1613 version: time::Global,
1614 cx: &mut MutableAppContext,
1615 ) -> Task<Result<(time::Global, SystemTime)>> {
1616 self.worktree.update(cx, |worktree, cx| match worktree {
1617 Worktree::Local(worktree) => {
1618 let rpc = worktree
1619 .share
1620 .as_ref()
1621 .map(|share| (share.rpc.clone(), share.remote_id));
1622 let save = worktree.save(self.path.clone(), text, cx);
1623 cx.background().spawn(async move {
1624 let entry = save.await?;
1625 if let Some((rpc, worktree_id)) = rpc {
1626 rpc.send(proto::BufferSaved {
1627 worktree_id,
1628 buffer_id,
1629 version: (&version).into(),
1630 mtime: Some(entry.mtime.into()),
1631 })
1632 .await?;
1633 }
1634 Ok((version, entry.mtime))
1635 })
1636 }
1637 Worktree::Remote(worktree) => {
1638 let rpc = worktree.rpc.clone();
1639 let worktree_id = worktree.remote_id;
1640 cx.background().spawn(async move {
1641 let response = rpc
1642 .request(proto::SaveBuffer {
1643 worktree_id,
1644 buffer_id,
1645 })
1646 .await?;
1647 let version = response.version.try_into()?;
1648 let mtime = response
1649 .mtime
1650 .ok_or_else(|| anyhow!("missing mtime"))?
1651 .into();
1652 Ok((version, mtime))
1653 })
1654 }
1655 })
1656 }
1657
1658 pub fn worktree_id(&self) -> usize {
1659 self.worktree.id()
1660 }
1661
1662 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1663 (self.worktree.id(), self.path.clone())
1664 }
1665}
1666
1667#[derive(Clone, Debug)]
1668pub struct Entry {
1669 pub id: usize,
1670 pub kind: EntryKind,
1671 pub path: Arc<Path>,
1672 pub inode: u64,
1673 pub mtime: SystemTime,
1674 pub is_symlink: bool,
1675 pub is_ignored: bool,
1676}
1677
1678#[derive(Clone, Debug)]
1679pub enum EntryKind {
1680 PendingDir,
1681 Dir,
1682 File(CharBag),
1683}
1684
1685impl Entry {
1686 fn new(
1687 path: Arc<Path>,
1688 metadata: &fs::Metadata,
1689 next_entry_id: &AtomicUsize,
1690 root_char_bag: CharBag,
1691 ) -> Self {
1692 Self {
1693 id: next_entry_id.fetch_add(1, SeqCst),
1694 kind: if metadata.is_dir {
1695 EntryKind::PendingDir
1696 } else {
1697 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1698 },
1699 path,
1700 inode: metadata.inode,
1701 mtime: metadata.mtime,
1702 is_symlink: metadata.is_symlink,
1703 is_ignored: false,
1704 }
1705 }
1706
1707 pub fn is_dir(&self) -> bool {
1708 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1709 }
1710
1711 pub fn is_file(&self) -> bool {
1712 matches!(self.kind, EntryKind::File(_))
1713 }
1714}
1715
1716impl sum_tree::Item for Entry {
1717 type Summary = EntrySummary;
1718
1719 fn summary(&self) -> Self::Summary {
1720 let file_count;
1721 let visible_file_count;
1722 if self.is_file() {
1723 file_count = 1;
1724 if self.is_ignored {
1725 visible_file_count = 0;
1726 } else {
1727 visible_file_count = 1;
1728 }
1729 } else {
1730 file_count = 0;
1731 visible_file_count = 0;
1732 }
1733
1734 EntrySummary {
1735 max_path: self.path.clone(),
1736 file_count,
1737 visible_file_count,
1738 }
1739 }
1740}
1741
1742impl sum_tree::KeyedItem for Entry {
1743 type Key = PathKey;
1744
1745 fn key(&self) -> Self::Key {
1746 PathKey(self.path.clone())
1747 }
1748}
1749
1750#[derive(Clone, Debug)]
1751pub struct EntrySummary {
1752 max_path: Arc<Path>,
1753 file_count: usize,
1754 visible_file_count: usize,
1755}
1756
1757impl Default for EntrySummary {
1758 fn default() -> Self {
1759 Self {
1760 max_path: Arc::from(Path::new("")),
1761 file_count: 0,
1762 visible_file_count: 0,
1763 }
1764 }
1765}
1766
1767impl sum_tree::Summary for EntrySummary {
1768 type Context = ();
1769
1770 fn add_summary(&mut self, rhs: &Self, _: &()) {
1771 self.max_path = rhs.max_path.clone();
1772 self.file_count += rhs.file_count;
1773 self.visible_file_count += rhs.visible_file_count;
1774 }
1775}
1776
1777#[derive(Clone, Debug)]
1778struct PathEntry {
1779 id: usize,
1780 path: Arc<Path>,
1781 scan_id: usize,
1782}
1783
1784impl sum_tree::Item for PathEntry {
1785 type Summary = PathEntrySummary;
1786
1787 fn summary(&self) -> Self::Summary {
1788 PathEntrySummary { max_id: self.id }
1789 }
1790}
1791
1792impl sum_tree::KeyedItem for PathEntry {
1793 type Key = usize;
1794
1795 fn key(&self) -> Self::Key {
1796 self.id
1797 }
1798}
1799
1800#[derive(Clone, Debug, Default)]
1801struct PathEntrySummary {
1802 max_id: usize,
1803}
1804
1805impl sum_tree::Summary for PathEntrySummary {
1806 type Context = ();
1807
1808 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1809 self.max_id = summary.max_id;
1810 }
1811}
1812
1813impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1814 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1815 *self = summary.max_id;
1816 }
1817}
1818
1819#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1820pub struct PathKey(Arc<Path>);
1821
1822impl Default for PathKey {
1823 fn default() -> Self {
1824 Self(Path::new("").into())
1825 }
1826}
1827
1828impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1829 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1830 self.0 = summary.max_path.clone();
1831 }
1832}
1833
1834#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1835enum PathSearch<'a> {
1836 Exact(&'a Path),
1837 Successor(&'a Path),
1838}
1839
1840impl<'a> Ord for PathSearch<'a> {
1841 fn cmp(&self, other: &Self) -> cmp::Ordering {
1842 match (self, other) {
1843 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1844 (Self::Successor(a), Self::Exact(b)) => {
1845 if b.starts_with(a) {
1846 cmp::Ordering::Greater
1847 } else {
1848 a.cmp(b)
1849 }
1850 }
1851 _ => unreachable!("not sure we need the other two cases"),
1852 }
1853 }
1854}
1855
1856impl<'a> PartialOrd for PathSearch<'a> {
1857 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1858 Some(self.cmp(other))
1859 }
1860}
1861
1862impl<'a> Default for PathSearch<'a> {
1863 fn default() -> Self {
1864 Self::Exact(Path::new("").into())
1865 }
1866}
1867
1868impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1869 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1870 *self = Self::Exact(summary.max_path.as_ref());
1871 }
1872}
1873
1874#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1875pub struct FileCount(usize);
1876
1877impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1878 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1879 self.0 += summary.file_count;
1880 }
1881}
1882
1883#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1884pub struct VisibleFileCount(usize);
1885
1886impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1887 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1888 self.0 += summary.visible_file_count;
1889 }
1890}
1891
1892struct BackgroundScanner {
1893 fs: Arc<dyn Fs>,
1894 snapshot: Arc<Mutex<Snapshot>>,
1895 notify: Sender<ScanState>,
1896 executor: Arc<executor::Background>,
1897}
1898
1899impl BackgroundScanner {
1900 fn new(
1901 snapshot: Arc<Mutex<Snapshot>>,
1902 notify: Sender<ScanState>,
1903 fs: Arc<dyn Fs>,
1904 executor: Arc<executor::Background>,
1905 ) -> Self {
1906 Self {
1907 fs,
1908 snapshot,
1909 notify,
1910 executor,
1911 }
1912 }
1913
1914 fn abs_path(&self) -> Arc<Path> {
1915 self.snapshot.lock().abs_path.clone()
1916 }
1917
1918 fn snapshot(&self) -> Snapshot {
1919 self.snapshot.lock().clone()
1920 }
1921
1922 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
1923 if self.notify.send(ScanState::Scanning).await.is_err() {
1924 return;
1925 }
1926
1927 if let Err(err) = self.scan_dirs().await {
1928 if self
1929 .notify
1930 .send(ScanState::Err(Arc::new(err)))
1931 .await
1932 .is_err()
1933 {
1934 return;
1935 }
1936 }
1937
1938 if self.notify.send(ScanState::Idle).await.is_err() {
1939 return;
1940 }
1941
1942 futures::pin_mut!(events_rx);
1943 while let Some(events) = events_rx.next().await {
1944 if self.notify.send(ScanState::Scanning).await.is_err() {
1945 break;
1946 }
1947
1948 if !self.process_events(events).await {
1949 break;
1950 }
1951
1952 if self.notify.send(ScanState::Idle).await.is_err() {
1953 break;
1954 }
1955 }
1956 }
1957
1958 async fn scan_dirs(&mut self) -> Result<()> {
1959 let root_char_bag;
1960 let next_entry_id;
1961 let is_dir;
1962 {
1963 let snapshot = self.snapshot.lock();
1964 root_char_bag = snapshot.root_char_bag;
1965 next_entry_id = snapshot.next_entry_id.clone();
1966 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
1967 };
1968
1969 if is_dir {
1970 let path: Arc<Path> = Arc::from(Path::new(""));
1971 let abs_path = self.abs_path();
1972 let (tx, rx) = channel::unbounded();
1973 tx.send(ScanJob {
1974 abs_path: abs_path.to_path_buf(),
1975 path,
1976 ignore_stack: IgnoreStack::none(),
1977 scan_queue: tx.clone(),
1978 })
1979 .await
1980 .unwrap();
1981 drop(tx);
1982
1983 self.executor
1984 .scoped(|scope| {
1985 for _ in 0..self.executor.num_cpus() {
1986 scope.spawn(async {
1987 while let Ok(job) = rx.recv().await {
1988 if let Err(err) = self
1989 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
1990 .await
1991 {
1992 log::error!("error scanning {:?}: {}", job.abs_path, err);
1993 }
1994 }
1995 });
1996 }
1997 })
1998 .await;
1999 }
2000
2001 Ok(())
2002 }
2003
2004 async fn scan_dir(
2005 &self,
2006 root_char_bag: CharBag,
2007 next_entry_id: Arc<AtomicUsize>,
2008 job: &ScanJob,
2009 ) -> Result<()> {
2010 let mut new_entries: Vec<Entry> = Vec::new();
2011 let mut new_jobs: Vec<ScanJob> = Vec::new();
2012 let mut ignore_stack = job.ignore_stack.clone();
2013 let mut new_ignore = None;
2014
2015 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2016 while let Some(child_abs_path) = child_paths.next().await {
2017 let child_abs_path = match child_abs_path {
2018 Ok(child_abs_path) => child_abs_path,
2019 Err(error) => {
2020 log::error!("error processing entry {:?}", error);
2021 continue;
2022 }
2023 };
2024 let child_name = child_abs_path.file_name().unwrap();
2025 let child_path: Arc<Path> = job.path.join(child_name).into();
2026 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2027 Some(metadata) => metadata,
2028 None => continue,
2029 };
2030
2031 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2032 if child_name == *GITIGNORE {
2033 let (ignore, err) = Gitignore::new(&child_abs_path);
2034 if let Some(err) = err {
2035 log::error!("error in ignore file {:?} - {:?}", child_name, err);
2036 }
2037 let ignore = Arc::new(ignore);
2038 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2039 new_ignore = Some(ignore);
2040
2041 // Update ignore status of any child entries we've already processed to reflect the
2042 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2043 // there should rarely be too numerous. Update the ignore stack associated with any
2044 // new jobs as well.
2045 let mut new_jobs = new_jobs.iter_mut();
2046 for entry in &mut new_entries {
2047 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2048 if entry.is_dir() {
2049 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2050 IgnoreStack::all()
2051 } else {
2052 ignore_stack.clone()
2053 };
2054 }
2055 }
2056 }
2057
2058 let mut child_entry = Entry::new(
2059 child_path.clone(),
2060 &child_metadata,
2061 &next_entry_id,
2062 root_char_bag,
2063 );
2064
2065 if child_metadata.is_dir {
2066 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2067 child_entry.is_ignored = is_ignored;
2068 new_entries.push(child_entry);
2069 new_jobs.push(ScanJob {
2070 abs_path: child_abs_path,
2071 path: child_path,
2072 ignore_stack: if is_ignored {
2073 IgnoreStack::all()
2074 } else {
2075 ignore_stack.clone()
2076 },
2077 scan_queue: job.scan_queue.clone(),
2078 });
2079 } else {
2080 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2081 new_entries.push(child_entry);
2082 };
2083 }
2084
2085 self.snapshot
2086 .lock()
2087 .populate_dir(job.path.clone(), new_entries, new_ignore);
2088 for new_job in new_jobs {
2089 job.scan_queue.send(new_job).await.unwrap();
2090 }
2091
2092 Ok(())
2093 }
2094
2095 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2096 let mut snapshot = self.snapshot();
2097 snapshot.scan_id += 1;
2098
2099 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2100 abs_path
2101 } else {
2102 return false;
2103 };
2104 let root_char_bag = snapshot.root_char_bag;
2105 let next_entry_id = snapshot.next_entry_id.clone();
2106
2107 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2108 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2109
2110 for event in &events {
2111 match event.path.strip_prefix(&root_abs_path) {
2112 Ok(path) => snapshot.remove_path(&path),
2113 Err(_) => {
2114 log::error!(
2115 "unexpected event {:?} for root path {:?}",
2116 event.path,
2117 root_abs_path
2118 );
2119 continue;
2120 }
2121 }
2122 }
2123
2124 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2125 for event in events {
2126 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2127 Ok(path) => Arc::from(path.to_path_buf()),
2128 Err(_) => {
2129 log::error!(
2130 "unexpected event {:?} for root path {:?}",
2131 event.path,
2132 root_abs_path
2133 );
2134 continue;
2135 }
2136 };
2137
2138 match self.fs.metadata(&event.path).await {
2139 Ok(Some(metadata)) => {
2140 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2141 let mut fs_entry = Entry::new(
2142 path.clone(),
2143 &metadata,
2144 snapshot.next_entry_id.as_ref(),
2145 snapshot.root_char_bag,
2146 );
2147 fs_entry.is_ignored = ignore_stack.is_all();
2148 snapshot.insert_entry(fs_entry);
2149 if metadata.is_dir {
2150 scan_queue_tx
2151 .send(ScanJob {
2152 abs_path: event.path,
2153 path,
2154 ignore_stack,
2155 scan_queue: scan_queue_tx.clone(),
2156 })
2157 .await
2158 .unwrap();
2159 }
2160 }
2161 Ok(None) => {}
2162 Err(err) => {
2163 // TODO - create a special 'error' entry in the entries tree to mark this
2164 log::error!("error reading file on event {:?}", err);
2165 }
2166 }
2167 }
2168
2169 *self.snapshot.lock() = snapshot;
2170
2171 // Scan any directories that were created as part of this event batch.
2172 drop(scan_queue_tx);
2173 self.executor
2174 .scoped(|scope| {
2175 for _ in 0..self.executor.num_cpus() {
2176 scope.spawn(async {
2177 while let Ok(job) = scan_queue_rx.recv().await {
2178 if let Err(err) = self
2179 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2180 .await
2181 {
2182 log::error!("error scanning {:?}: {}", job.abs_path, err);
2183 }
2184 }
2185 });
2186 }
2187 })
2188 .await;
2189
2190 // Attempt to detect renames only over a single batch of file-system events.
2191 self.snapshot.lock().removed_entry_ids.clear();
2192
2193 self.update_ignore_statuses().await;
2194 true
2195 }
2196
2197 async fn update_ignore_statuses(&self) {
2198 let mut snapshot = self.snapshot();
2199
2200 let mut ignores_to_update = Vec::new();
2201 let mut ignores_to_delete = Vec::new();
2202 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2203 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2204 ignores_to_update.push(parent_path.clone());
2205 }
2206
2207 let ignore_path = parent_path.join(&*GITIGNORE);
2208 if snapshot.entry_for_path(ignore_path).is_none() {
2209 ignores_to_delete.push(parent_path.clone());
2210 }
2211 }
2212
2213 for parent_path in ignores_to_delete {
2214 snapshot.ignores.remove(&parent_path);
2215 self.snapshot.lock().ignores.remove(&parent_path);
2216 }
2217
2218 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2219 ignores_to_update.sort_unstable();
2220 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2221 while let Some(parent_path) = ignores_to_update.next() {
2222 while ignores_to_update
2223 .peek()
2224 .map_or(false, |p| p.starts_with(&parent_path))
2225 {
2226 ignores_to_update.next().unwrap();
2227 }
2228
2229 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2230 ignore_queue_tx
2231 .send(UpdateIgnoreStatusJob {
2232 path: parent_path,
2233 ignore_stack,
2234 ignore_queue: ignore_queue_tx.clone(),
2235 })
2236 .await
2237 .unwrap();
2238 }
2239 drop(ignore_queue_tx);
2240
2241 self.executor
2242 .scoped(|scope| {
2243 for _ in 0..self.executor.num_cpus() {
2244 scope.spawn(async {
2245 while let Ok(job) = ignore_queue_rx.recv().await {
2246 self.update_ignore_status(job, &snapshot).await;
2247 }
2248 });
2249 }
2250 })
2251 .await;
2252 }
2253
2254 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2255 let mut ignore_stack = job.ignore_stack;
2256 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2257 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2258 }
2259
2260 let mut edits = Vec::new();
2261 for mut entry in snapshot.child_entries(&job.path).cloned() {
2262 let was_ignored = entry.is_ignored;
2263 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2264 if entry.is_dir() {
2265 let child_ignore_stack = if entry.is_ignored {
2266 IgnoreStack::all()
2267 } else {
2268 ignore_stack.clone()
2269 };
2270 job.ignore_queue
2271 .send(UpdateIgnoreStatusJob {
2272 path: entry.path.clone(),
2273 ignore_stack: child_ignore_stack,
2274 ignore_queue: job.ignore_queue.clone(),
2275 })
2276 .await
2277 .unwrap();
2278 }
2279
2280 if entry.is_ignored != was_ignored {
2281 edits.push(Edit::Insert(entry));
2282 }
2283 }
2284 self.snapshot.lock().entries_by_path.edit(edits, &());
2285 }
2286}
2287
2288async fn refresh_entry(
2289 fs: &dyn Fs,
2290 snapshot: &Mutex<Snapshot>,
2291 path: Arc<Path>,
2292 abs_path: &Path,
2293) -> Result<Entry> {
2294 let root_char_bag;
2295 let next_entry_id;
2296 {
2297 let snapshot = snapshot.lock();
2298 root_char_bag = snapshot.root_char_bag;
2299 next_entry_id = snapshot.next_entry_id.clone();
2300 }
2301 let entry = Entry::new(
2302 path,
2303 &fs.metadata(abs_path)
2304 .await?
2305 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2306 &next_entry_id,
2307 root_char_bag,
2308 );
2309 Ok(snapshot.lock().insert_entry(entry))
2310}
2311
2312fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2313 let mut result = root_char_bag;
2314 result.extend(
2315 path.to_string_lossy()
2316 .chars()
2317 .map(|c| c.to_ascii_lowercase()),
2318 );
2319 result
2320}
2321
2322struct ScanJob {
2323 abs_path: PathBuf,
2324 path: Arc<Path>,
2325 ignore_stack: Arc<IgnoreStack>,
2326 scan_queue: Sender<ScanJob>,
2327}
2328
2329struct UpdateIgnoreStatusJob {
2330 path: Arc<Path>,
2331 ignore_stack: Arc<IgnoreStack>,
2332 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2333}
2334
2335pub trait WorktreeHandle {
2336 #[cfg(test)]
2337 fn flush_fs_events<'a>(
2338 &self,
2339 cx: &'a gpui::TestAppContext,
2340 ) -> futures::future::LocalBoxFuture<'a, ()>;
2341}
2342
2343impl WorktreeHandle for ModelHandle<Worktree> {
2344 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2345 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2346 // extra directory scans, and emit extra scan-state notifications.
2347 //
2348 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2349 // to ensure that all redundant FS events have already been processed.
2350 #[cfg(test)]
2351 fn flush_fs_events<'a>(
2352 &self,
2353 cx: &'a gpui::TestAppContext,
2354 ) -> futures::future::LocalBoxFuture<'a, ()> {
2355 use smol::future::FutureExt;
2356
2357 let filename = "fs-event-sentinel";
2358 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2359 let tree = self.clone();
2360 async move {
2361 std::fs::write(root_path.join(filename), "").unwrap();
2362 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2363 .await;
2364
2365 std::fs::remove_file(root_path.join(filename)).unwrap();
2366 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2367 .await;
2368
2369 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2370 .await;
2371 }
2372 .boxed_local()
2373 }
2374}
2375
2376pub enum FileIter<'a> {
2377 All(Cursor<'a, Entry, FileCount, ()>),
2378 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2379}
2380
2381impl<'a> FileIter<'a> {
2382 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2383 let mut cursor = snapshot.entries_by_path.cursor();
2384 cursor.seek(&FileCount(start), Bias::Right, &());
2385 Self::All(cursor)
2386 }
2387
2388 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2389 let mut cursor = snapshot.entries_by_path.cursor();
2390 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2391 Self::Visible(cursor)
2392 }
2393
2394 fn next_internal(&mut self) {
2395 match self {
2396 Self::All(cursor) => {
2397 let ix = *cursor.seek_start();
2398 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2399 }
2400 Self::Visible(cursor) => {
2401 let ix = *cursor.seek_start();
2402 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2403 }
2404 }
2405 }
2406
2407 fn item(&self) -> Option<&'a Entry> {
2408 match self {
2409 Self::All(cursor) => cursor.item(),
2410 Self::Visible(cursor) => cursor.item(),
2411 }
2412 }
2413}
2414
2415impl<'a> Iterator for FileIter<'a> {
2416 type Item = &'a Entry;
2417
2418 fn next(&mut self) -> Option<Self::Item> {
2419 if let Some(entry) = self.item() {
2420 self.next_internal();
2421 Some(entry)
2422 } else {
2423 None
2424 }
2425 }
2426}
2427
2428struct ChildEntriesIter<'a> {
2429 parent_path: &'a Path,
2430 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2431}
2432
2433impl<'a> ChildEntriesIter<'a> {
2434 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2435 let mut cursor = snapshot.entries_by_path.cursor();
2436 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2437 Self {
2438 parent_path,
2439 cursor,
2440 }
2441 }
2442}
2443
2444impl<'a> Iterator for ChildEntriesIter<'a> {
2445 type Item = &'a Entry;
2446
2447 fn next(&mut self) -> Option<Self::Item> {
2448 if let Some(item) = self.cursor.item() {
2449 if item.path.starts_with(self.parent_path) {
2450 self.cursor
2451 .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2452 Some(item)
2453 } else {
2454 None
2455 }
2456 } else {
2457 None
2458 }
2459 }
2460}
2461
2462impl<'a> From<&'a Entry> for proto::Entry {
2463 fn from(entry: &'a Entry) -> Self {
2464 Self {
2465 id: entry.id as u64,
2466 is_dir: entry.is_dir(),
2467 path: entry.path.to_string_lossy().to_string(),
2468 inode: entry.inode,
2469 mtime: Some(entry.mtime.into()),
2470 is_symlink: entry.is_symlink,
2471 is_ignored: entry.is_ignored,
2472 }
2473 }
2474}
2475
2476impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2477 type Error = anyhow::Error;
2478
2479 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2480 if let Some(mtime) = entry.mtime {
2481 let kind = if entry.is_dir {
2482 EntryKind::Dir
2483 } else {
2484 let mut char_bag = root_char_bag.clone();
2485 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2486 EntryKind::File(char_bag)
2487 };
2488 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2489 Ok(Entry {
2490 id: entry.id as usize,
2491 kind,
2492 path: path.clone(),
2493 inode: entry.inode,
2494 mtime: mtime.into(),
2495 is_symlink: entry.is_symlink,
2496 is_ignored: entry.is_ignored,
2497 })
2498 } else {
2499 Err(anyhow!(
2500 "missing mtime in remote worktree entry {:?}",
2501 entry.path
2502 ))
2503 }
2504 }
2505}
2506
2507#[cfg(test)]
2508mod tests {
2509 use super::*;
2510 use crate::test::*;
2511 use anyhow::Result;
2512 use fs::RealFs;
2513 use rand::prelude::*;
2514 use serde_json::json;
2515 use std::time::UNIX_EPOCH;
2516 use std::{env, fmt::Write, os::unix, time::SystemTime};
2517
2518 #[gpui::test]
2519 async fn test_populate_and_search(cx: gpui::TestAppContext) {
2520 let dir = temp_tree(json!({
2521 "root": {
2522 "apple": "",
2523 "banana": {
2524 "carrot": {
2525 "date": "",
2526 "endive": "",
2527 }
2528 },
2529 "fennel": {
2530 "grape": "",
2531 }
2532 }
2533 }));
2534
2535 let root_link_path = dir.path().join("root_link");
2536 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2537 unix::fs::symlink(
2538 &dir.path().join("root/fennel"),
2539 &dir.path().join("root/finnochio"),
2540 )
2541 .unwrap();
2542
2543 let tree = Worktree::open_local(
2544 root_link_path,
2545 Default::default(),
2546 Arc::new(RealFs),
2547 &mut cx.to_async(),
2548 )
2549 .await
2550 .unwrap();
2551
2552 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2553 .await;
2554 let snapshots = [cx.read(|cx| {
2555 let tree = tree.read(cx);
2556 assert_eq!(tree.file_count(), 5);
2557 assert_eq!(
2558 tree.inode_for_path("fennel/grape"),
2559 tree.inode_for_path("finnochio/grape")
2560 );
2561 tree.snapshot()
2562 })];
2563 let cancel_flag = Default::default();
2564 let results = cx
2565 .read(|cx| {
2566 match_paths(
2567 &snapshots,
2568 "bna",
2569 false,
2570 false,
2571 10,
2572 &cancel_flag,
2573 cx.background().clone(),
2574 )
2575 })
2576 .await;
2577 assert_eq!(
2578 results
2579 .into_iter()
2580 .map(|result| result.path)
2581 .collect::<Vec<Arc<Path>>>(),
2582 vec![
2583 PathBuf::from("banana/carrot/date").into(),
2584 PathBuf::from("banana/carrot/endive").into(),
2585 ]
2586 );
2587 }
2588
2589 #[gpui::test]
2590 async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2591 let dir = temp_tree(json!({
2592 "root": {
2593 "dir1": {},
2594 "dir2": {
2595 "dir3": {}
2596 }
2597 }
2598 }));
2599 let tree = Worktree::open_local(
2600 dir.path(),
2601 Default::default(),
2602 Arc::new(RealFs),
2603 &mut cx.to_async(),
2604 )
2605 .await
2606 .unwrap();
2607
2608 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2609 .await;
2610 let snapshots = [cx.read(|cx| {
2611 let tree = tree.read(cx);
2612 assert_eq!(tree.file_count(), 0);
2613 tree.snapshot()
2614 })];
2615 let cancel_flag = Default::default();
2616 let results = cx
2617 .read(|cx| {
2618 match_paths(
2619 &snapshots,
2620 "dir",
2621 false,
2622 false,
2623 10,
2624 &cancel_flag,
2625 cx.background().clone(),
2626 )
2627 })
2628 .await;
2629 assert_eq!(
2630 results
2631 .into_iter()
2632 .map(|result| result.path)
2633 .collect::<Vec<Arc<Path>>>(),
2634 vec![]
2635 );
2636 }
2637
2638 #[gpui::test]
2639 async fn test_save_file(mut cx: gpui::TestAppContext) {
2640 let app_state = cx.read(build_app_state);
2641 let dir = temp_tree(json!({
2642 "file1": "the old contents",
2643 }));
2644 let tree = Worktree::open_local(
2645 dir.path(),
2646 app_state.languages.clone(),
2647 Arc::new(RealFs),
2648 &mut cx.to_async(),
2649 )
2650 .await
2651 .unwrap();
2652 let buffer = tree
2653 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2654 .await
2655 .unwrap();
2656 let save = buffer.update(&mut cx, |buffer, cx| {
2657 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2658 buffer.save(cx).unwrap()
2659 });
2660 save.await.unwrap();
2661
2662 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2663 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2664 }
2665
2666 #[gpui::test]
2667 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2668 let app_state = cx.read(build_app_state);
2669 let dir = temp_tree(json!({
2670 "file1": "the old contents",
2671 }));
2672 let file_path = dir.path().join("file1");
2673
2674 let tree = Worktree::open_local(
2675 file_path.clone(),
2676 app_state.languages.clone(),
2677 Arc::new(RealFs),
2678 &mut cx.to_async(),
2679 )
2680 .await
2681 .unwrap();
2682 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2683 .await;
2684 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2685
2686 let buffer = tree
2687 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2688 .await
2689 .unwrap();
2690 let save = buffer.update(&mut cx, |buffer, cx| {
2691 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2692 buffer.save(cx).unwrap()
2693 });
2694 save.await.unwrap();
2695
2696 let new_text = std::fs::read_to_string(file_path).unwrap();
2697 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2698 }
2699
2700 #[gpui::test]
2701 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2702 let dir = temp_tree(json!({
2703 "a": {
2704 "file1": "",
2705 "file2": "",
2706 "file3": "",
2707 },
2708 "b": {
2709 "c": {
2710 "file4": "",
2711 "file5": "",
2712 }
2713 }
2714 }));
2715
2716 let tree = Worktree::open_local(
2717 dir.path(),
2718 Default::default(),
2719 Arc::new(RealFs),
2720 &mut cx.to_async(),
2721 )
2722 .await
2723 .unwrap();
2724
2725 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2726 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2727 async move { buffer.await.unwrap() }
2728 };
2729 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2730 tree.read_with(cx, |tree, _| {
2731 tree.entry_for_path(path)
2732 .expect(&format!("no entry for path {}", path))
2733 .id
2734 })
2735 };
2736
2737 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2738 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2739 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2740 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2741
2742 let file2_id = id_for_path("a/file2", &cx);
2743 let file3_id = id_for_path("a/file3", &cx);
2744 let file4_id = id_for_path("b/c/file4", &cx);
2745
2746 // Wait for the initial scan.
2747 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2748 .await;
2749
2750 // Create a remote copy of this worktree.
2751 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2752 let worktree_id = 1;
2753 let share_request = tree
2754 .update(&mut cx, |tree, cx| {
2755 tree.as_local().unwrap().share_request(cx)
2756 })
2757 .await;
2758 let remote = Worktree::remote(
2759 proto::OpenWorktreeResponse {
2760 worktree_id,
2761 worktree: share_request.worktree,
2762 replica_id: 1,
2763 peers: Vec::new(),
2764 },
2765 rpc::Client::new(),
2766 Default::default(),
2767 &mut cx.to_async(),
2768 )
2769 .await
2770 .unwrap();
2771
2772 cx.read(|cx| {
2773 assert!(!buffer2.read(cx).is_dirty());
2774 assert!(!buffer3.read(cx).is_dirty());
2775 assert!(!buffer4.read(cx).is_dirty());
2776 assert!(!buffer5.read(cx).is_dirty());
2777 });
2778
2779 // Rename and delete files and directories.
2780 tree.flush_fs_events(&cx).await;
2781 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2782 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2783 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2784 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2785 tree.flush_fs_events(&cx).await;
2786
2787 let expected_paths = vec![
2788 "a",
2789 "a/file1",
2790 "a/file2.new",
2791 "b",
2792 "d",
2793 "d/file3",
2794 "d/file4",
2795 ];
2796
2797 cx.read(|app| {
2798 assert_eq!(
2799 tree.read(app)
2800 .paths()
2801 .map(|p| p.to_str().unwrap())
2802 .collect::<Vec<_>>(),
2803 expected_paths
2804 );
2805
2806 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2807 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2808 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2809
2810 assert_eq!(
2811 buffer2.read(app).file().unwrap().path().as_ref(),
2812 Path::new("a/file2.new")
2813 );
2814 assert_eq!(
2815 buffer3.read(app).file().unwrap().path().as_ref(),
2816 Path::new("d/file3")
2817 );
2818 assert_eq!(
2819 buffer4.read(app).file().unwrap().path().as_ref(),
2820 Path::new("d/file4")
2821 );
2822 assert_eq!(
2823 buffer5.read(app).file().unwrap().path().as_ref(),
2824 Path::new("b/c/file5")
2825 );
2826
2827 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2828 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2829 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2830 assert!(buffer5.read(app).file().unwrap().is_deleted());
2831 });
2832
2833 // Update the remote worktree. Check that it becomes consistent with the
2834 // local worktree.
2835 remote.update(&mut cx, |remote, cx| {
2836 let update_message = tree
2837 .read(cx)
2838 .snapshot()
2839 .build_update(&initial_snapshot, worktree_id);
2840 remote
2841 .as_remote_mut()
2842 .unwrap()
2843 .snapshot
2844 .apply_update(update_message)
2845 .unwrap();
2846
2847 assert_eq!(
2848 remote
2849 .paths()
2850 .map(|p| p.to_str().unwrap())
2851 .collect::<Vec<_>>(),
2852 expected_paths
2853 );
2854 });
2855 }
2856
2857 #[gpui::test]
2858 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2859 let dir = temp_tree(json!({
2860 ".git": {},
2861 ".gitignore": "ignored-dir\n",
2862 "tracked-dir": {
2863 "tracked-file1": "tracked contents",
2864 },
2865 "ignored-dir": {
2866 "ignored-file1": "ignored contents",
2867 }
2868 }));
2869
2870 let tree = Worktree::open_local(
2871 dir.path(),
2872 Default::default(),
2873 Arc::new(RealFs),
2874 &mut cx.to_async(),
2875 )
2876 .await
2877 .unwrap();
2878 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2879 .await;
2880 tree.flush_fs_events(&cx).await;
2881 cx.read(|cx| {
2882 let tree = tree.read(cx);
2883 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2884 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2885 assert_eq!(tracked.is_ignored, false);
2886 assert_eq!(ignored.is_ignored, true);
2887 });
2888
2889 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2890 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2891 tree.flush_fs_events(&cx).await;
2892 cx.read(|cx| {
2893 let tree = tree.read(cx);
2894 let dot_git = tree.entry_for_path(".git").unwrap();
2895 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2896 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2897 assert_eq!(tracked.is_ignored, false);
2898 assert_eq!(ignored.is_ignored, true);
2899 assert_eq!(dot_git.is_ignored, true);
2900 });
2901 }
2902
2903 #[gpui::test(iterations = 100)]
2904 fn test_random(mut rng: StdRng) {
2905 let operations = env::var("OPERATIONS")
2906 .map(|o| o.parse().unwrap())
2907 .unwrap_or(40);
2908 let initial_entries = env::var("INITIAL_ENTRIES")
2909 .map(|o| o.parse().unwrap())
2910 .unwrap_or(20);
2911
2912 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
2913 for _ in 0..initial_entries {
2914 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2915 }
2916 log::info!("Generated initial tree");
2917
2918 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2919 let fs = Arc::new(RealFs);
2920 let next_entry_id = Arc::new(AtomicUsize::new(0));
2921 let mut initial_snapshot = Snapshot {
2922 id: 0,
2923 scan_id: 0,
2924 abs_path: root_dir.path().into(),
2925 entries_by_path: Default::default(),
2926 entries_by_id: Default::default(),
2927 removed_entry_ids: Default::default(),
2928 ignores: Default::default(),
2929 root_name: Default::default(),
2930 root_char_bag: Default::default(),
2931 next_entry_id: next_entry_id.clone(),
2932 };
2933 initial_snapshot.insert_entry(Entry::new(
2934 Path::new("").into(),
2935 &smol::block_on(fs.metadata(root_dir.path()))
2936 .unwrap()
2937 .unwrap(),
2938 &next_entry_id,
2939 Default::default(),
2940 ));
2941 let mut scanner = BackgroundScanner::new(
2942 Arc::new(Mutex::new(initial_snapshot.clone())),
2943 notify_tx,
2944 fs.clone(),
2945 Arc::new(gpui::executor::Background::new()),
2946 );
2947 smol::block_on(scanner.scan_dirs()).unwrap();
2948 scanner.snapshot().check_invariants();
2949
2950 let mut events = Vec::new();
2951 let mut mutations_len = operations;
2952 while mutations_len > 1 {
2953 if !events.is_empty() && rng.gen_bool(0.4) {
2954 let len = rng.gen_range(0..=events.len());
2955 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2956 log::info!("Delivering events: {:#?}", to_deliver);
2957 smol::block_on(scanner.process_events(to_deliver));
2958 scanner.snapshot().check_invariants();
2959 } else {
2960 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2961 mutations_len -= 1;
2962 }
2963 }
2964 log::info!("Quiescing: {:#?}", events);
2965 smol::block_on(scanner.process_events(events));
2966 scanner.snapshot().check_invariants();
2967
2968 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2969 let mut new_scanner = BackgroundScanner::new(
2970 Arc::new(Mutex::new(initial_snapshot)),
2971 notify_tx,
2972 scanner.fs.clone(),
2973 scanner.executor.clone(),
2974 );
2975 smol::block_on(new_scanner.scan_dirs()).unwrap();
2976 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
2977 }
2978
2979 fn randomly_mutate_tree(
2980 root_path: &Path,
2981 insertion_probability: f64,
2982 rng: &mut impl Rng,
2983 ) -> Result<Vec<fsevent::Event>> {
2984 let root_path = root_path.canonicalize().unwrap();
2985 let (dirs, files) = read_dir_recursive(root_path.clone());
2986
2987 let mut events = Vec::new();
2988 let mut record_event = |path: PathBuf| {
2989 events.push(fsevent::Event {
2990 event_id: SystemTime::now()
2991 .duration_since(UNIX_EPOCH)
2992 .unwrap()
2993 .as_secs(),
2994 flags: fsevent::StreamFlags::empty(),
2995 path,
2996 });
2997 };
2998
2999 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3000 let path = dirs.choose(rng).unwrap();
3001 let new_path = path.join(gen_name(rng));
3002
3003 if rng.gen() {
3004 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3005 std::fs::create_dir(&new_path)?;
3006 } else {
3007 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3008 std::fs::write(&new_path, "")?;
3009 }
3010 record_event(new_path);
3011 } else if rng.gen_bool(0.05) {
3012 let ignore_dir_path = dirs.choose(rng).unwrap();
3013 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3014
3015 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3016 let files_to_ignore = {
3017 let len = rng.gen_range(0..=subfiles.len());
3018 subfiles.choose_multiple(rng, len)
3019 };
3020 let dirs_to_ignore = {
3021 let len = rng.gen_range(0..subdirs.len());
3022 subdirs.choose_multiple(rng, len)
3023 };
3024
3025 let mut ignore_contents = String::new();
3026 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3027 write!(
3028 ignore_contents,
3029 "{}\n",
3030 path_to_ignore
3031 .strip_prefix(&ignore_dir_path)?
3032 .to_str()
3033 .unwrap()
3034 )
3035 .unwrap();
3036 }
3037 log::info!(
3038 "Creating {:?} with contents:\n{}",
3039 ignore_path.strip_prefix(&root_path)?,
3040 ignore_contents
3041 );
3042 std::fs::write(&ignore_path, ignore_contents).unwrap();
3043 record_event(ignore_path);
3044 } else {
3045 let old_path = {
3046 let file_path = files.choose(rng);
3047 let dir_path = dirs[1..].choose(rng);
3048 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3049 };
3050
3051 let is_rename = rng.gen();
3052 if is_rename {
3053 let new_path_parent = dirs
3054 .iter()
3055 .filter(|d| !d.starts_with(old_path))
3056 .choose(rng)
3057 .unwrap();
3058
3059 let overwrite_existing_dir =
3060 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3061 let new_path = if overwrite_existing_dir {
3062 std::fs::remove_dir_all(&new_path_parent).ok();
3063 new_path_parent.to_path_buf()
3064 } else {
3065 new_path_parent.join(gen_name(rng))
3066 };
3067
3068 log::info!(
3069 "Renaming {:?} to {}{:?}",
3070 old_path.strip_prefix(&root_path)?,
3071 if overwrite_existing_dir {
3072 "overwrite "
3073 } else {
3074 ""
3075 },
3076 new_path.strip_prefix(&root_path)?
3077 );
3078 std::fs::rename(&old_path, &new_path)?;
3079 record_event(old_path.clone());
3080 record_event(new_path);
3081 } else if old_path.is_dir() {
3082 let (dirs, files) = read_dir_recursive(old_path.clone());
3083
3084 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3085 std::fs::remove_dir_all(&old_path).unwrap();
3086 for file in files {
3087 record_event(file);
3088 }
3089 for dir in dirs {
3090 record_event(dir);
3091 }
3092 } else {
3093 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3094 std::fs::remove_file(old_path).unwrap();
3095 record_event(old_path.clone());
3096 }
3097 }
3098
3099 Ok(events)
3100 }
3101
3102 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3103 let child_entries = std::fs::read_dir(&path).unwrap();
3104 let mut dirs = vec![path];
3105 let mut files = Vec::new();
3106 for child_entry in child_entries {
3107 let child_path = child_entry.unwrap().path();
3108 if child_path.is_dir() {
3109 let (child_dirs, child_files) = read_dir_recursive(child_path);
3110 dirs.extend(child_dirs);
3111 files.extend(child_files);
3112 } else {
3113 files.push(child_path);
3114 }
3115 }
3116 (dirs, files)
3117 }
3118
3119 fn gen_name(rng: &mut impl Rng) -> String {
3120 (0..6)
3121 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3122 .map(char::from)
3123 .collect()
3124 }
3125
3126 impl Snapshot {
3127 fn check_invariants(&self) {
3128 let mut files = self.files(0);
3129 let mut visible_files = self.visible_files(0);
3130 for entry in self.entries_by_path.cursor::<(), ()>() {
3131 if entry.is_file() {
3132 assert_eq!(files.next().unwrap().inode, entry.inode);
3133 if !entry.is_ignored {
3134 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3135 }
3136 }
3137 }
3138 assert!(files.next().is_none());
3139 assert!(visible_files.next().is_none());
3140
3141 let mut bfs_paths = Vec::new();
3142 let mut stack = vec![Path::new("")];
3143 while let Some(path) = stack.pop() {
3144 bfs_paths.push(path);
3145 let ix = stack.len();
3146 for child_entry in self.child_entries(path) {
3147 stack.insert(ix, &child_entry.path);
3148 }
3149 }
3150
3151 let dfs_paths = self
3152 .entries_by_path
3153 .cursor::<(), ()>()
3154 .map(|e| e.path.as_ref())
3155 .collect::<Vec<_>>();
3156 assert_eq!(bfs_paths, dfs_paths);
3157
3158 for (ignore_parent_path, _) in &self.ignores {
3159 assert!(self.entry_for_path(ignore_parent_path).is_some());
3160 assert!(self
3161 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3162 .is_some());
3163 }
3164 }
3165
3166 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3167 let mut paths = Vec::new();
3168 for entry in self.entries_by_path.cursor::<(), ()>() {
3169 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3170 }
3171 paths.sort_by(|a, b| a.0.cmp(&b.0));
3172 paths
3173 }
3174 }
3175}