1mod char_bag;
2mod fuzzy;
3mod ignore;
4
5use self::{char_bag::CharBag, ignore::IgnoreStack};
6use crate::{
7 editor::{self, Buffer, History, Operation, Rope},
8 fs::{self, Fs},
9 language::LanguageRegistry,
10 rpc::{self, proto},
11 sum_tree::{self, Cursor, Edit, SumTree},
12 time::{self, ReplicaId},
13 util::Bias,
14};
15use ::ignore::gitignore::Gitignore;
16use anyhow::{anyhow, Result};
17use futures::{Stream, StreamExt};
18pub use fuzzy::{match_paths, PathMatch};
19use gpui::{
20 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
21 Task, WeakModelHandle,
22};
23use lazy_static::lazy_static;
24use parking_lot::Mutex;
25use postage::{
26 prelude::{Sink as _, Stream as _},
27 watch,
28};
29use smol::channel::{self, Sender};
30use std::{
31 cmp::{self, Ordering},
32 collections::HashMap,
33 convert::{TryFrom, TryInto},
34 ffi::{OsStr, OsString},
35 fmt,
36 future::Future,
37 ops::Deref,
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicUsize, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, SystemTime},
44};
45use zrpc::{ForegroundRouter, PeerId, TypedEnvelope};
46
47lazy_static! {
48 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
49}
50
51pub fn init(cx: &mut MutableAppContext, rpc: &rpc::Client, router: &mut ForegroundRouter) {
52 rpc.on_message(router, remote::add_peer, cx);
53 rpc.on_message(router, remote::remove_peer, cx);
54 rpc.on_message(router, remote::update_worktree, cx);
55 rpc.on_message(router, remote::open_buffer, cx);
56 rpc.on_message(router, remote::close_buffer, cx);
57 rpc.on_message(router, remote::update_buffer, cx);
58 rpc.on_message(router, remote::buffer_saved, cx);
59 rpc.on_message(router, remote::save_buffer, cx);
60}
61
62#[derive(Clone, Debug)]
63enum ScanState {
64 Idle,
65 Scanning,
66 Err(Arc<anyhow::Error>),
67}
68
69pub enum Worktree {
70 Local(LocalWorktree),
71 Remote(RemoteWorktree),
72}
73
74impl Entity for Worktree {
75 type Event = ();
76
77 fn release(&mut self, cx: &mut MutableAppContext) {
78 let rpc = match self {
79 Self::Local(tree) => tree.rpc.clone(),
80 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
81 };
82
83 if let Some((rpc, worktree_id)) = rpc {
84 cx.spawn(|_| async move {
85 rpc.state
86 .write()
87 .await
88 .shared_worktrees
89 .remove(&worktree_id);
90 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
91 log::error!("error closing worktree {}: {}", worktree_id, err);
92 }
93 })
94 .detach();
95 }
96 }
97}
98
99impl Worktree {
100 pub async fn open_local(
101 path: impl Into<Arc<Path>>,
102 languages: Arc<LanguageRegistry>,
103 fs: Arc<dyn Fs>,
104 cx: &mut AsyncAppContext,
105 ) -> Result<ModelHandle<Self>> {
106 let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx).await?;
107 tree.update(cx, |tree, cx| {
108 let tree = tree.as_local_mut().unwrap();
109 let abs_path = tree.snapshot.abs_path.clone();
110 let background_snapshot = tree.background_snapshot.clone();
111 let background = cx.background().clone();
112 tree._background_scanner_task = Some(cx.background().spawn(async move {
113 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
114 let scanner =
115 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
116 scanner.run(events).await;
117 }));
118 });
119 Ok(tree)
120 }
121
122 pub async fn open_remote(
123 rpc: rpc::Client,
124 id: u64,
125 access_token: String,
126 languages: Arc<LanguageRegistry>,
127 cx: &mut AsyncAppContext,
128 ) -> Result<ModelHandle<Self>> {
129 let response = rpc
130 .request(proto::OpenWorktree {
131 worktree_id: id,
132 access_token,
133 })
134 .await?;
135
136 Worktree::remote(response, rpc, languages, cx).await
137 }
138
139 async fn remote(
140 open_response: proto::OpenWorktreeResponse,
141 rpc: rpc::Client,
142 languages: Arc<LanguageRegistry>,
143 cx: &mut AsyncAppContext,
144 ) -> Result<ModelHandle<Self>> {
145 let worktree = open_response
146 .worktree
147 .ok_or_else(|| anyhow!("empty worktree"))?;
148
149 let remote_id = open_response.worktree_id;
150 let replica_id = open_response.replica_id as ReplicaId;
151 let peers = open_response.peers;
152 let root_char_bag: CharBag = worktree
153 .root_name
154 .chars()
155 .map(|c| c.to_ascii_lowercase())
156 .collect();
157 let root_name = worktree.root_name.clone();
158 let (entries_by_path, entries_by_id) = cx
159 .background()
160 .spawn(async move {
161 let mut entries_by_path_edits = Vec::new();
162 let mut entries_by_id_edits = Vec::new();
163 for entry in worktree.entries {
164 match Entry::try_from((&root_char_bag, entry)) {
165 Ok(entry) => {
166 entries_by_id_edits.push(Edit::Insert(PathEntry {
167 id: entry.id,
168 path: entry.path.clone(),
169 scan_id: 0,
170 }));
171 entries_by_path_edits.push(Edit::Insert(entry));
172 }
173 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
174 }
175 }
176
177 let mut entries_by_path = SumTree::new();
178 let mut entries_by_id = SumTree::new();
179 entries_by_path.edit(entries_by_path_edits, &());
180 entries_by_id.edit(entries_by_id_edits, &());
181 (entries_by_path, entries_by_id)
182 })
183 .await;
184
185 let worktree = cx.update(|cx| {
186 cx.add_model(|cx: &mut ModelContext<Worktree>| {
187 let snapshot = Snapshot {
188 id: cx.model_id(),
189 scan_id: 0,
190 abs_path: Path::new("").into(),
191 root_name,
192 root_char_bag,
193 ignores: Default::default(),
194 entries_by_path,
195 entries_by_id,
196 removed_entry_ids: Default::default(),
197 next_entry_id: Default::default(),
198 };
199
200 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
201 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
202
203 cx.background()
204 .spawn(async move {
205 while let Some(update) = updates_rx.recv().await {
206 let mut snapshot = snapshot_tx.borrow().clone();
207 if let Err(error) = snapshot.apply_update(update) {
208 log::error!("error applying worktree update: {}", error);
209 }
210 *snapshot_tx.borrow_mut() = snapshot;
211 }
212 })
213 .detach();
214
215 {
216 let mut snapshot_rx = snapshot_rx.clone();
217 cx.spawn_weak(|this, mut cx| async move {
218 while let Some(_) = snapshot_rx.recv().await {
219 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
220 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
221 } else {
222 break;
223 }
224 }
225 })
226 .detach();
227 }
228
229 Worktree::Remote(RemoteWorktree {
230 remote_id,
231 replica_id,
232 snapshot,
233 snapshot_rx,
234 updates_tx,
235 rpc: rpc.clone(),
236 open_buffers: Default::default(),
237 peers: peers
238 .into_iter()
239 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
240 .collect(),
241 languages,
242 })
243 })
244 });
245 rpc.state
246 .write()
247 .await
248 .shared_worktrees
249 .insert(open_response.worktree_id, worktree.downgrade());
250
251 Ok(worktree)
252 }
253
254 pub fn as_local(&self) -> Option<&LocalWorktree> {
255 if let Worktree::Local(worktree) = self {
256 Some(worktree)
257 } else {
258 None
259 }
260 }
261
262 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
263 if let Worktree::Local(worktree) = self {
264 Some(worktree)
265 } else {
266 None
267 }
268 }
269
270 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
271 if let Worktree::Remote(worktree) = self {
272 Some(worktree)
273 } else {
274 None
275 }
276 }
277
278 pub fn snapshot(&self) -> Snapshot {
279 match self {
280 Worktree::Local(worktree) => worktree.snapshot(),
281 Worktree::Remote(worktree) => worktree.snapshot(),
282 }
283 }
284
285 pub fn replica_id(&self) -> ReplicaId {
286 match self {
287 Worktree::Local(_) => 0,
288 Worktree::Remote(worktree) => worktree.replica_id,
289 }
290 }
291
292 pub fn add_peer(
293 &mut self,
294 envelope: TypedEnvelope<proto::AddPeer>,
295 cx: &mut ModelContext<Worktree>,
296 ) -> Result<()> {
297 match self {
298 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
299 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
300 }
301 }
302
303 pub fn remove_peer(
304 &mut self,
305 envelope: TypedEnvelope<proto::RemovePeer>,
306 cx: &mut ModelContext<Worktree>,
307 ) -> Result<()> {
308 match self {
309 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
310 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
311 }
312 }
313
314 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
315 match self {
316 Worktree::Local(worktree) => &worktree.peers,
317 Worktree::Remote(worktree) => &worktree.peers,
318 }
319 }
320
321 pub fn open_buffer(
322 &mut self,
323 path: impl AsRef<Path>,
324 cx: &mut ModelContext<Self>,
325 ) -> Task<Result<ModelHandle<Buffer>>> {
326 match self {
327 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
328 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
329 }
330 }
331
332 #[cfg(feature = "test-support")]
333 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
334 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
335 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
336 Worktree::Remote(worktree) => {
337 Box::new(worktree.open_buffers.values().filter_map(|buf| {
338 if let RemoteBuffer::Loaded(buf) = buf {
339 Some(buf)
340 } else {
341 None
342 }
343 }))
344 }
345 };
346
347 let path = path.as_ref();
348 open_buffers
349 .find(|buffer| {
350 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
351 file.path.as_ref() == path
352 } else {
353 false
354 }
355 })
356 .is_some()
357 }
358
359 pub fn update_buffer(
360 &mut self,
361 envelope: proto::UpdateBuffer,
362 cx: &mut ModelContext<Self>,
363 ) -> Result<()> {
364 let buffer_id = envelope.buffer_id as usize;
365 let ops = envelope
366 .operations
367 .into_iter()
368 .map(|op| op.try_into())
369 .collect::<anyhow::Result<Vec<_>>>()?;
370
371 match self {
372 Worktree::Local(worktree) => {
373 let buffer = worktree
374 .open_buffers
375 .get(&buffer_id)
376 .and_then(|buf| buf.upgrade(&cx))
377 .ok_or_else(|| {
378 anyhow!("invalid buffer {} in update buffer message", buffer_id)
379 })?;
380 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
381 }
382 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
383 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
384 Some(RemoteBuffer::Loaded(buffer)) => {
385 if let Some(buffer) = buffer.upgrade(&cx) {
386 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
387 } else {
388 worktree
389 .open_buffers
390 .insert(buffer_id, RemoteBuffer::Operations(ops));
391 }
392 }
393 None => {
394 worktree
395 .open_buffers
396 .insert(buffer_id, RemoteBuffer::Operations(ops));
397 }
398 },
399 }
400
401 Ok(())
402 }
403
404 pub fn buffer_saved(
405 &mut self,
406 message: proto::BufferSaved,
407 cx: &mut ModelContext<Self>,
408 ) -> Result<()> {
409 if let Worktree::Remote(worktree) = self {
410 if let Some(buffer) = worktree
411 .open_buffers
412 .get(&(message.buffer_id as usize))
413 .and_then(|buf| buf.upgrade(&cx))
414 {
415 buffer.update(cx, |buffer, cx| {
416 let version = message.version.try_into()?;
417 let mtime = message
418 .mtime
419 .ok_or_else(|| anyhow!("missing mtime"))?
420 .into();
421 buffer.did_save(version, mtime, cx);
422 Result::<_, anyhow::Error>::Ok(())
423 })?;
424 }
425 Ok(())
426 } else {
427 Err(anyhow!(
428 "invalid buffer {} in buffer saved message",
429 message.buffer_id
430 ))
431 }
432 }
433
434 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
435 match self {
436 Self::Local(worktree) => {
437 let is_fake_fs = worktree.fs.is_fake();
438 worktree.snapshot = worktree.background_snapshot.lock().clone();
439 if worktree.is_scanning() {
440 if worktree.poll_task.is_none() {
441 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
442 if is_fake_fs {
443 smol::future::yield_now().await;
444 } else {
445 smol::Timer::after(Duration::from_millis(100)).await;
446 }
447 this.update(&mut cx, |this, cx| {
448 this.as_local_mut().unwrap().poll_task = None;
449 this.poll_snapshot(cx);
450 })
451 }));
452 }
453 } else {
454 worktree.poll_task.take();
455 self.update_open_buffers(cx);
456 }
457 }
458 Self::Remote(worktree) => {
459 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
460 self.update_open_buffers(cx);
461 }
462 };
463
464 cx.notify();
465 }
466
467 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
468 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
469 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
470 Self::Remote(worktree) => {
471 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
472 if let RemoteBuffer::Loaded(buf) = buf {
473 Some((id, buf))
474 } else {
475 None
476 }
477 }))
478 }
479 };
480
481 let mut buffers_to_delete = Vec::new();
482 for (buffer_id, buffer) in open_buffers {
483 if let Some(buffer) = buffer.upgrade(&cx) {
484 buffer.update(cx, |buffer, cx| {
485 let buffer_is_clean = !buffer.is_dirty();
486
487 if let Some(file) = buffer.file_mut() {
488 let mut file_changed = false;
489
490 if let Some(entry) = file
491 .entry_id
492 .and_then(|entry_id| self.entry_for_id(entry_id))
493 {
494 if entry.path != file.path {
495 file.path = entry.path.clone();
496 file_changed = true;
497 }
498
499 if entry.mtime != file.mtime {
500 file.mtime = entry.mtime;
501 file_changed = true;
502 if let Some(worktree) = self.as_local() {
503 if buffer_is_clean {
504 let abs_path = worktree.absolutize(&file.path);
505 refresh_buffer(abs_path, &worktree.fs, cx);
506 }
507 }
508 }
509 } else if let Some(entry) = self.entry_for_path(&file.path) {
510 file.entry_id = Some(entry.id);
511 file.mtime = entry.mtime;
512 if let Some(worktree) = self.as_local() {
513 if buffer_is_clean {
514 let abs_path = worktree.absolutize(&file.path);
515 refresh_buffer(abs_path, &worktree.fs, cx);
516 }
517 }
518 file_changed = true;
519 } else if !file.is_deleted() {
520 if buffer_is_clean {
521 cx.emit(editor::buffer::Event::Dirtied);
522 }
523 file.entry_id = None;
524 file_changed = true;
525 }
526
527 if file_changed {
528 cx.emit(editor::buffer::Event::FileHandleChanged);
529 }
530 }
531 });
532 } else {
533 buffers_to_delete.push(*buffer_id);
534 }
535 }
536
537 for buffer_id in buffers_to_delete {
538 match self {
539 Self::Local(worktree) => {
540 worktree.open_buffers.remove(&buffer_id);
541 }
542 Self::Remote(worktree) => {
543 worktree.open_buffers.remove(&buffer_id);
544 }
545 }
546 }
547 }
548}
549
550impl Deref for Worktree {
551 type Target = Snapshot;
552
553 fn deref(&self) -> &Self::Target {
554 match self {
555 Worktree::Local(worktree) => &worktree.snapshot,
556 Worktree::Remote(worktree) => &worktree.snapshot,
557 }
558 }
559}
560
561pub struct LocalWorktree {
562 snapshot: Snapshot,
563 background_snapshot: Arc<Mutex<Snapshot>>,
564 snapshots_to_send_tx: Option<Sender<Snapshot>>,
565 last_scan_state_rx: watch::Receiver<ScanState>,
566 _background_scanner_task: Option<Task<()>>,
567 poll_task: Option<Task<()>>,
568 rpc: Option<(rpc::Client, u64)>,
569 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
570 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
571 peers: HashMap<PeerId, ReplicaId>,
572 languages: Arc<LanguageRegistry>,
573 fs: Arc<dyn Fs>,
574}
575
576impl LocalWorktree {
577 async fn new(
578 path: impl Into<Arc<Path>>,
579 languages: Arc<LanguageRegistry>,
580 fs: Arc<dyn Fs>,
581 cx: &mut AsyncAppContext,
582 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
583 let abs_path = path.into();
584 let path: Arc<Path> = Arc::from(Path::new(""));
585 let next_entry_id = AtomicUsize::new(0);
586
587 // After determining whether the root entry is a file or a directory, populate the
588 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
589 let mut root_name = abs_path
590 .file_name()
591 .map_or(String::new(), |f| f.to_string_lossy().to_string());
592 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
593 let metadata = fs
594 .metadata(&abs_path)
595 .await?
596 .ok_or_else(|| anyhow!("root entry does not exist"))?;
597 if metadata.is_dir {
598 root_name.push('/');
599 }
600
601 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
602 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
603 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
604 let mut snapshot = Snapshot {
605 id: cx.model_id(),
606 scan_id: 0,
607 abs_path,
608 root_name,
609 root_char_bag,
610 ignores: Default::default(),
611 entries_by_path: Default::default(),
612 entries_by_id: Default::default(),
613 removed_entry_ids: Default::default(),
614 next_entry_id: Arc::new(next_entry_id),
615 };
616 snapshot.insert_entry(Entry::new(
617 path.into(),
618 &metadata,
619 &snapshot.next_entry_id,
620 snapshot.root_char_bag,
621 ));
622
623 let tree = Self {
624 snapshot: snapshot.clone(),
625 background_snapshot: Arc::new(Mutex::new(snapshot)),
626 snapshots_to_send_tx: None,
627 last_scan_state_rx,
628 _background_scanner_task: None,
629 poll_task: None,
630 open_buffers: Default::default(),
631 shared_buffers: Default::default(),
632 peers: Default::default(),
633 rpc: None,
634 languages,
635 fs,
636 };
637
638 cx.spawn_weak(|this, mut cx| async move {
639 while let Ok(scan_state) = scan_states_rx.recv().await {
640 if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) {
641 let to_send = handle.update(&mut cx, |this, cx| {
642 last_scan_state_tx.blocking_send(scan_state).ok();
643 this.poll_snapshot(cx);
644 let tree = this.as_local_mut().unwrap();
645 if !tree.is_scanning() {
646 if let Some(snapshots_to_send_tx) =
647 tree.snapshots_to_send_tx.clone()
648 {
649 Some((tree.snapshot(), snapshots_to_send_tx))
650 } else {
651 None
652 }
653 } else {
654 None
655 }
656 });
657
658 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
659 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
660 log::error!("error submitting snapshot to send {}", err);
661 }
662 }
663 } else {
664 break;
665 }
666 }
667 })
668 .detach();
669
670 Worktree::Local(tree)
671 });
672
673 Ok((tree, scan_states_tx))
674 }
675
676 pub fn open_buffer(
677 &mut self,
678 path: &Path,
679 cx: &mut ModelContext<Worktree>,
680 ) -> Task<Result<ModelHandle<Buffer>>> {
681 let handle = cx.handle();
682
683 // If there is already a buffer for the given path, then return it.
684 let mut existing_buffer = None;
685 self.open_buffers.retain(|_buffer_id, buffer| {
686 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
687 if let Some(file) = buffer.read(cx.as_ref()).file() {
688 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
689 existing_buffer = Some(buffer);
690 }
691 }
692 true
693 } else {
694 false
695 }
696 });
697
698 let languages = self.languages.clone();
699 let path = Arc::from(path);
700 cx.spawn(|this, mut cx| async move {
701 if let Some(existing_buffer) = existing_buffer {
702 Ok(existing_buffer)
703 } else {
704 let (file, contents) = this
705 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
706 .await?;
707 let language = languages.select_language(&path).cloned();
708 let buffer = cx.add_model(|cx| {
709 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
710 });
711 this.update(&mut cx, |this, _| {
712 let this = this
713 .as_local_mut()
714 .ok_or_else(|| anyhow!("must be a local worktree"))?;
715 this.open_buffers.insert(buffer.id(), buffer.downgrade());
716 Ok(buffer)
717 })
718 }
719 })
720 }
721
722 pub fn open_remote_buffer(
723 &mut self,
724 envelope: TypedEnvelope<proto::OpenBuffer>,
725 cx: &mut ModelContext<Worktree>,
726 ) -> Task<Result<proto::OpenBufferResponse>> {
727 let peer_id = envelope.original_sender_id();
728 let path = Path::new(&envelope.payload.path);
729
730 let buffer = self.open_buffer(path, cx);
731
732 cx.spawn(|this, mut cx| async move {
733 let buffer = buffer.await?;
734 this.update(&mut cx, |this, cx| {
735 this.as_local_mut()
736 .unwrap()
737 .shared_buffers
738 .entry(peer_id?)
739 .or_default()
740 .insert(buffer.id() as u64, buffer.clone());
741
742 Ok(proto::OpenBufferResponse {
743 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
744 })
745 })
746 })
747 }
748
749 pub fn close_remote_buffer(
750 &mut self,
751 envelope: TypedEnvelope<proto::CloseBuffer>,
752 _: &mut ModelContext<Worktree>,
753 ) -> Result<()> {
754 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
755 shared_buffers.remove(&envelope.payload.buffer_id);
756 }
757
758 Ok(())
759 }
760
761 pub fn add_peer(
762 &mut self,
763 envelope: TypedEnvelope<proto::AddPeer>,
764 cx: &mut ModelContext<Worktree>,
765 ) -> Result<()> {
766 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
767 self.peers
768 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
769 cx.notify();
770 Ok(())
771 }
772
773 pub fn remove_peer(
774 &mut self,
775 envelope: TypedEnvelope<proto::RemovePeer>,
776 cx: &mut ModelContext<Worktree>,
777 ) -> Result<()> {
778 let peer_id = PeerId(envelope.payload.peer_id);
779 let replica_id = self
780 .peers
781 .remove(&peer_id)
782 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
783 self.shared_buffers.remove(&peer_id);
784 for (_, buffer) in &self.open_buffers {
785 if let Some(buffer) = buffer.upgrade(&cx) {
786 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
787 }
788 }
789 cx.notify();
790 Ok(())
791 }
792
793 pub fn scan_complete(&self) -> impl Future<Output = ()> {
794 let mut scan_state_rx = self.last_scan_state_rx.clone();
795 async move {
796 let mut scan_state = Some(scan_state_rx.borrow().clone());
797 while let Some(ScanState::Scanning) = scan_state {
798 scan_state = scan_state_rx.recv().await;
799 }
800 }
801 }
802
803 fn is_scanning(&self) -> bool {
804 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
805 true
806 } else {
807 false
808 }
809 }
810
811 pub fn snapshot(&self) -> Snapshot {
812 self.snapshot.clone()
813 }
814
815 pub fn abs_path(&self) -> &Path {
816 self.snapshot.abs_path.as_ref()
817 }
818
819 pub fn contains_abs_path(&self, path: &Path) -> bool {
820 path.starts_with(&self.snapshot.abs_path)
821 }
822
823 fn absolutize(&self, path: &Path) -> PathBuf {
824 if path.file_name().is_some() {
825 self.snapshot.abs_path.join(path)
826 } else {
827 self.snapshot.abs_path.to_path_buf()
828 }
829 }
830
831 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
832 let handle = cx.handle();
833 let path = Arc::from(path);
834 let abs_path = self.absolutize(&path);
835 let background_snapshot = self.background_snapshot.clone();
836 let fs = self.fs.clone();
837 cx.spawn(|this, mut cx| async move {
838 let text = fs.load(&abs_path).await?;
839 // Eagerly populate the snapshot with an updated entry for the loaded file
840 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
841 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
842 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
843 })
844 }
845
846 pub fn save_buffer_as(
847 &self,
848 buffer: ModelHandle<Buffer>,
849 path: impl Into<Arc<Path>>,
850 text: Rope,
851 cx: &mut ModelContext<Worktree>,
852 ) -> Task<Result<File>> {
853 let save = self.save(path, text, cx);
854 cx.spawn(|this, mut cx| async move {
855 let entry = save.await?;
856 this.update(&mut cx, |this, cx| {
857 this.as_local_mut()
858 .unwrap()
859 .open_buffers
860 .insert(buffer.id(), buffer.downgrade());
861 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
862 })
863 })
864 }
865
866 fn save(
867 &self,
868 path: impl Into<Arc<Path>>,
869 text: Rope,
870 cx: &mut ModelContext<Worktree>,
871 ) -> Task<Result<Entry>> {
872 let path = path.into();
873 let abs_path = self.absolutize(&path);
874 let background_snapshot = self.background_snapshot.clone();
875 let fs = self.fs.clone();
876 let save = cx.background().spawn(async move {
877 fs.save(&abs_path, &text).await?;
878 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
879 });
880
881 cx.spawn(|this, mut cx| async move {
882 let entry = save.await?;
883 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
884 Ok(entry)
885 })
886 }
887
888 pub fn share(
889 &mut self,
890 rpc: rpc::Client,
891 cx: &mut ModelContext<Worktree>,
892 ) -> Task<anyhow::Result<(u64, String)>> {
893 let snapshot = self.snapshot();
894 let share_request = self.share_request(cx);
895 let handle = cx.handle();
896 cx.spawn(|this, mut cx| async move {
897 let share_request = share_request.await;
898 let share_response = rpc.request(share_request).await?;
899
900 rpc.state
901 .write()
902 .await
903 .shared_worktrees
904 .insert(share_response.worktree_id, handle.downgrade());
905
906 log::info!("sharing worktree {:?}", share_response);
907 let (snapshots_to_send_tx, snapshots_to_send_rx) =
908 smol::channel::unbounded::<Snapshot>();
909
910 cx.background()
911 .spawn({
912 let rpc = rpc.clone();
913 let worktree_id = share_response.worktree_id;
914 async move {
915 let mut prev_snapshot = snapshot;
916 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
917 let message = snapshot.build_update(&prev_snapshot, worktree_id);
918 match rpc.send(message).await {
919 Ok(()) => prev_snapshot = snapshot,
920 Err(err) => log::error!("error sending snapshot diff {}", err),
921 }
922 }
923 }
924 })
925 .detach();
926
927 this.update(&mut cx, |worktree, _| {
928 let worktree = worktree.as_local_mut().unwrap();
929 worktree.rpc = Some((rpc, share_response.worktree_id));
930 worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx);
931 });
932
933 Ok((share_response.worktree_id, share_response.access_token))
934 })
935 }
936
937 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<proto::ShareWorktree> {
938 let snapshot = self.snapshot();
939 let root_name = self.root_name.clone();
940 cx.background().spawn(async move {
941 let entries = snapshot
942 .entries_by_path
943 .cursor::<(), ()>()
944 .map(Into::into)
945 .collect();
946 proto::ShareWorktree {
947 worktree: Some(proto::Worktree { root_name, entries }),
948 }
949 })
950 }
951}
952
953pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
954 let fs = fs.clone();
955 cx.spawn(|buffer, mut cx| async move {
956 let new_text = fs.load(&abs_path).await;
957 match new_text {
958 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
959 Ok(new_text) => {
960 buffer
961 .update(&mut cx, |buffer, cx| {
962 buffer.set_text_from_disk(new_text.into(), cx)
963 })
964 .await;
965 }
966 }
967 })
968 .detach()
969}
970
971impl Deref for LocalWorktree {
972 type Target = Snapshot;
973
974 fn deref(&self) -> &Self::Target {
975 &self.snapshot
976 }
977}
978
979impl fmt::Debug for LocalWorktree {
980 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
981 self.snapshot.fmt(f)
982 }
983}
984
985pub struct RemoteWorktree {
986 remote_id: u64,
987 snapshot: Snapshot,
988 snapshot_rx: watch::Receiver<Snapshot>,
989 rpc: rpc::Client,
990 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
991 replica_id: ReplicaId,
992 open_buffers: HashMap<usize, RemoteBuffer>,
993 peers: HashMap<PeerId, ReplicaId>,
994 languages: Arc<LanguageRegistry>,
995}
996
997impl RemoteWorktree {
998 pub fn open_buffer(
999 &mut self,
1000 path: &Path,
1001 cx: &mut ModelContext<Worktree>,
1002 ) -> Task<Result<ModelHandle<Buffer>>> {
1003 let handle = cx.handle();
1004 let mut existing_buffer = None;
1005 self.open_buffers.retain(|_buffer_id, buffer| {
1006 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1007 if let Some(file) = buffer.read(cx.as_ref()).file() {
1008 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1009 existing_buffer = Some(buffer);
1010 }
1011 }
1012 true
1013 } else {
1014 false
1015 }
1016 });
1017
1018 let rpc = self.rpc.clone();
1019 let languages = self.languages.clone();
1020 let replica_id = self.replica_id;
1021 let remote_worktree_id = self.remote_id;
1022 let path = path.to_string_lossy().to_string();
1023 cx.spawn(|this, mut cx| async move {
1024 if let Some(existing_buffer) = existing_buffer {
1025 Ok(existing_buffer)
1026 } else {
1027 let entry = this
1028 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1029 .ok_or_else(|| anyhow!("file does not exist"))?;
1030 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1031 let language = languages.select_language(&path).cloned();
1032 let response = rpc
1033 .request(proto::OpenBuffer {
1034 worktree_id: remote_worktree_id as u64,
1035 path,
1036 })
1037 .await?;
1038 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1039 let buffer_id = remote_buffer.id as usize;
1040 let buffer = cx.add_model(|cx| {
1041 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1042 });
1043 this.update(&mut cx, |this, cx| {
1044 let this = this.as_remote_mut().unwrap();
1045 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1046 .open_buffers
1047 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1048 {
1049 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1050 }
1051 Result::<_, anyhow::Error>::Ok(())
1052 })?;
1053 Ok(buffer)
1054 }
1055 })
1056 }
1057
1058 fn snapshot(&self) -> Snapshot {
1059 self.snapshot.clone()
1060 }
1061
1062 pub fn add_peer(
1063 &mut self,
1064 envelope: TypedEnvelope<proto::AddPeer>,
1065 cx: &mut ModelContext<Worktree>,
1066 ) -> Result<()> {
1067 let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?;
1068 self.peers
1069 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1070 cx.notify();
1071 Ok(())
1072 }
1073
1074 pub fn remove_peer(
1075 &mut self,
1076 envelope: TypedEnvelope<proto::RemovePeer>,
1077 cx: &mut ModelContext<Worktree>,
1078 ) -> Result<()> {
1079 let peer_id = PeerId(envelope.payload.peer_id);
1080 let replica_id = self
1081 .peers
1082 .remove(&peer_id)
1083 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1084 for (_, buffer) in &self.open_buffers {
1085 if let Some(buffer) = buffer.upgrade(&cx) {
1086 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1087 }
1088 }
1089 cx.notify();
1090 Ok(())
1091 }
1092}
1093
1094enum RemoteBuffer {
1095 Operations(Vec<Operation>),
1096 Loaded(WeakModelHandle<Buffer>),
1097}
1098
1099impl RemoteBuffer {
1100 fn upgrade(&self, cx: impl AsRef<AppContext>) -> Option<ModelHandle<Buffer>> {
1101 match self {
1102 Self::Operations(_) => None,
1103 Self::Loaded(buffer) => buffer.upgrade(cx),
1104 }
1105 }
1106}
1107
1108#[derive(Clone)]
1109pub struct Snapshot {
1110 id: usize,
1111 scan_id: usize,
1112 abs_path: Arc<Path>,
1113 root_name: String,
1114 root_char_bag: CharBag,
1115 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1116 entries_by_path: SumTree<Entry>,
1117 entries_by_id: SumTree<PathEntry>,
1118 removed_entry_ids: HashMap<u64, usize>,
1119 next_entry_id: Arc<AtomicUsize>,
1120}
1121
1122impl Snapshot {
1123 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1124 let mut updated_entries = Vec::new();
1125 let mut removed_entries = Vec::new();
1126 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1127 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1128 loop {
1129 match (self_entries.peek(), other_entries.peek()) {
1130 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1131 Ordering::Less => {
1132 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1133 updated_entries.push(entry);
1134 self_entries.next();
1135 }
1136 Ordering::Equal => {
1137 if self_entry.scan_id != other_entry.scan_id {
1138 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1139 updated_entries.push(entry);
1140 }
1141
1142 self_entries.next();
1143 other_entries.next();
1144 }
1145 Ordering::Greater => {
1146 removed_entries.push(other_entry.id as u64);
1147 other_entries.next();
1148 }
1149 },
1150 (Some(self_entry), None) => {
1151 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1152 updated_entries.push(entry);
1153 self_entries.next();
1154 }
1155 (None, Some(other_entry)) => {
1156 removed_entries.push(other_entry.id as u64);
1157 other_entries.next();
1158 }
1159 (None, None) => break,
1160 }
1161 }
1162
1163 proto::UpdateWorktree {
1164 updated_entries,
1165 removed_entries,
1166 worktree_id,
1167 }
1168 }
1169
1170 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1171 self.scan_id += 1;
1172 let scan_id = self.scan_id;
1173
1174 let mut entries_by_path_edits = Vec::new();
1175 let mut entries_by_id_edits = Vec::new();
1176 for entry_id in update.removed_entries {
1177 let entry_id = entry_id as usize;
1178 let entry = self
1179 .entry_for_id(entry_id)
1180 .ok_or_else(|| anyhow!("unknown entry"))?;
1181 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1182 entries_by_id_edits.push(Edit::Remove(entry.id));
1183 }
1184
1185 for entry in update.updated_entries {
1186 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1187 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1188 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1189 }
1190 entries_by_id_edits.push(Edit::Insert(PathEntry {
1191 id: entry.id,
1192 path: entry.path.clone(),
1193 scan_id,
1194 }));
1195 entries_by_path_edits.push(Edit::Insert(entry));
1196 }
1197
1198 self.entries_by_path.edit(entries_by_path_edits, &());
1199 self.entries_by_id.edit(entries_by_id_edits, &());
1200
1201 Ok(())
1202 }
1203
1204 pub fn file_count(&self) -> usize {
1205 self.entries_by_path.summary().file_count
1206 }
1207
1208 pub fn visible_file_count(&self) -> usize {
1209 self.entries_by_path.summary().visible_file_count
1210 }
1211
1212 pub fn files(&self, start: usize) -> FileIter {
1213 FileIter::all(self, start)
1214 }
1215
1216 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1217 let empty_path = Path::new("");
1218 self.entries_by_path
1219 .cursor::<(), ()>()
1220 .filter(move |entry| entry.path.as_ref() != empty_path)
1221 .map(|entry| entry.path())
1222 }
1223
1224 pub fn visible_files(&self, start: usize) -> FileIter {
1225 FileIter::visible(self, start)
1226 }
1227
1228 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1229 ChildEntriesIter::new(path, self)
1230 }
1231
1232 pub fn root_entry(&self) -> &Entry {
1233 self.entry_for_path("").unwrap()
1234 }
1235
1236 /// Returns the filename of the snapshot's root, plus a trailing slash if the snapshot's root is
1237 /// a directory.
1238 pub fn root_name(&self) -> &str {
1239 &self.root_name
1240 }
1241
1242 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1243 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1244 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1245 cursor.item()
1246 } else {
1247 None
1248 }
1249 }
1250
1251 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1252 let entry = self.entries_by_id.get(&id, &())?;
1253 self.entry_for_path(&entry.path)
1254 }
1255
1256 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1257 self.entry_for_path(path.as_ref()).map(|e| e.inode())
1258 }
1259
1260 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1261 if !entry.is_dir() && entry.path().file_name() == Some(&GITIGNORE) {
1262 let (ignore, err) = Gitignore::new(self.abs_path.join(entry.path()));
1263 if let Some(err) = err {
1264 log::error!("error in ignore file {:?} - {:?}", entry.path(), err);
1265 }
1266
1267 let ignore_dir_path = entry.path().parent().unwrap();
1268 self.ignores
1269 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1270 }
1271
1272 self.reuse_entry_id(&mut entry);
1273 self.entries_by_path.insert_or_replace(entry.clone(), &());
1274 self.entries_by_id.insert_or_replace(
1275 PathEntry {
1276 id: entry.id,
1277 path: entry.path.clone(),
1278 scan_id: self.scan_id,
1279 },
1280 &(),
1281 );
1282 entry
1283 }
1284
1285 fn populate_dir(
1286 &mut self,
1287 parent_path: Arc<Path>,
1288 entries: impl IntoIterator<Item = Entry>,
1289 ignore: Option<Arc<Gitignore>>,
1290 ) {
1291 let mut parent_entry = self
1292 .entries_by_path
1293 .get(&PathKey(parent_path.clone()), &())
1294 .unwrap()
1295 .clone();
1296 if let Some(ignore) = ignore {
1297 self.ignores.insert(parent_path, (ignore, self.scan_id));
1298 }
1299 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1300 parent_entry.kind = EntryKind::Dir;
1301 } else {
1302 unreachable!();
1303 }
1304
1305 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1306 let mut entries_by_id_edits = Vec::new();
1307
1308 for mut entry in entries {
1309 self.reuse_entry_id(&mut entry);
1310 entries_by_id_edits.push(Edit::Insert(PathEntry {
1311 id: entry.id,
1312 path: entry.path.clone(),
1313 scan_id: self.scan_id,
1314 }));
1315 entries_by_path_edits.push(Edit::Insert(entry));
1316 }
1317
1318 self.entries_by_path.edit(entries_by_path_edits, &());
1319 self.entries_by_id.edit(entries_by_id_edits, &());
1320 }
1321
1322 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1323 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1324 entry.id = removed_entry_id;
1325 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1326 entry.id = existing_entry.id;
1327 }
1328 }
1329
1330 fn remove_path(&mut self, path: &Path) {
1331 let mut new_entries;
1332 let removed_entry_ids;
1333 {
1334 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1335 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1336 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1337 new_entries.push_tree(cursor.suffix(&()), &());
1338 }
1339 self.entries_by_path = new_entries;
1340
1341 let mut entries_by_id_edits = Vec::new();
1342 for entry in removed_entry_ids.cursor::<(), ()>() {
1343 let removed_entry_id = self
1344 .removed_entry_ids
1345 .entry(entry.inode)
1346 .or_insert(entry.id);
1347 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1348 entries_by_id_edits.push(Edit::Remove(entry.id));
1349 }
1350 self.entries_by_id.edit(entries_by_id_edits, &());
1351
1352 if path.file_name() == Some(&GITIGNORE) {
1353 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1354 *scan_id = self.scan_id;
1355 }
1356 }
1357 }
1358
1359 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1360 let mut new_ignores = Vec::new();
1361 for ancestor in path.ancestors().skip(1) {
1362 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1363 new_ignores.push((ancestor, Some(ignore.clone())));
1364 } else {
1365 new_ignores.push((ancestor, None));
1366 }
1367 }
1368
1369 let mut ignore_stack = IgnoreStack::none();
1370 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1371 if ignore_stack.is_path_ignored(&parent_path, true) {
1372 ignore_stack = IgnoreStack::all();
1373 break;
1374 } else if let Some(ignore) = ignore {
1375 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1376 }
1377 }
1378
1379 if ignore_stack.is_path_ignored(path, is_dir) {
1380 ignore_stack = IgnoreStack::all();
1381 }
1382
1383 ignore_stack
1384 }
1385}
1386
1387impl fmt::Debug for Snapshot {
1388 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1389 for entry in self.entries_by_path.cursor::<(), ()>() {
1390 for _ in entry.path().ancestors().skip(1) {
1391 write!(f, " ")?;
1392 }
1393 writeln!(f, "{:?} (inode: {})", entry.path(), entry.inode())?;
1394 }
1395 Ok(())
1396 }
1397}
1398
1399#[derive(Clone, PartialEq)]
1400pub struct File {
1401 entry_id: Option<usize>,
1402 worktree: ModelHandle<Worktree>,
1403 pub path: Arc<Path>,
1404 pub mtime: SystemTime,
1405}
1406
1407impl File {
1408 pub fn new(
1409 entry_id: usize,
1410 worktree: ModelHandle<Worktree>,
1411 path: Arc<Path>,
1412 mtime: SystemTime,
1413 ) -> Self {
1414 Self {
1415 entry_id: Some(entry_id),
1416 worktree,
1417 path,
1418 mtime,
1419 }
1420 }
1421
1422 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1423 self.worktree.update(cx, |worktree, cx| {
1424 if let Some((rpc, remote_id)) = match worktree {
1425 Worktree::Local(worktree) => worktree.rpc.clone(),
1426 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1427 } {
1428 cx.spawn(|_, _| async move {
1429 if let Err(error) = rpc
1430 .send(proto::UpdateBuffer {
1431 worktree_id: remote_id,
1432 buffer_id,
1433 operations: Some(operation).iter().map(Into::into).collect(),
1434 })
1435 .await
1436 {
1437 log::error!("error sending buffer operation: {}", error);
1438 }
1439 })
1440 .detach();
1441 }
1442 });
1443 }
1444
1445 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1446 self.worktree.update(cx, |worktree, cx| {
1447 if let Worktree::Remote(worktree) = worktree {
1448 let worktree_id = worktree.remote_id;
1449 let rpc = worktree.rpc.clone();
1450 cx.background()
1451 .spawn(async move {
1452 if let Err(error) = rpc
1453 .send(proto::CloseBuffer {
1454 worktree_id,
1455 buffer_id,
1456 })
1457 .await
1458 {
1459 log::error!("error closing remote buffer: {}", error);
1460 };
1461 })
1462 .detach();
1463 }
1464 });
1465 }
1466
1467 /// Returns this file's path relative to the root of its worktree.
1468 pub fn path(&self) -> Arc<Path> {
1469 self.path.clone()
1470 }
1471
1472 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1473 self.worktree.read(cx).abs_path.join(&self.path)
1474 }
1475
1476 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1477 /// of its worktree, then this method will return the name of the worktree itself.
1478 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1479 self.path
1480 .file_name()
1481 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1482 .map(Into::into)
1483 }
1484
1485 pub fn is_deleted(&self) -> bool {
1486 self.entry_id.is_none()
1487 }
1488
1489 pub fn exists(&self) -> bool {
1490 !self.is_deleted()
1491 }
1492
1493 pub fn save(
1494 &self,
1495 buffer_id: u64,
1496 text: Rope,
1497 version: time::Global,
1498 cx: &mut MutableAppContext,
1499 ) -> Task<Result<(time::Global, SystemTime)>> {
1500 self.worktree.update(cx, |worktree, cx| match worktree {
1501 Worktree::Local(worktree) => {
1502 let rpc = worktree.rpc.clone();
1503 let save = worktree.save(self.path.clone(), text, cx);
1504 cx.spawn(|_, _| async move {
1505 let entry = save.await?;
1506 if let Some((rpc, worktree_id)) = rpc {
1507 rpc.send(proto::BufferSaved {
1508 worktree_id,
1509 buffer_id,
1510 version: (&version).into(),
1511 mtime: Some(entry.mtime.into()),
1512 })
1513 .await?;
1514 }
1515 Ok((version, entry.mtime))
1516 })
1517 }
1518 Worktree::Remote(worktree) => {
1519 let rpc = worktree.rpc.clone();
1520 let worktree_id = worktree.remote_id;
1521 cx.spawn(|_, _| async move {
1522 let response = rpc
1523 .request(proto::SaveBuffer {
1524 worktree_id,
1525 buffer_id,
1526 })
1527 .await?;
1528 let version = response.version.try_into()?;
1529 let mtime = response
1530 .mtime
1531 .ok_or_else(|| anyhow!("missing mtime"))?
1532 .into();
1533 Ok((version, mtime))
1534 })
1535 }
1536 })
1537 }
1538
1539 pub fn worktree_id(&self) -> usize {
1540 self.worktree.id()
1541 }
1542
1543 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1544 (self.worktree.id(), self.path())
1545 }
1546}
1547
1548#[derive(Clone, Debug)]
1549pub struct Entry {
1550 id: usize,
1551 kind: EntryKind,
1552 path: Arc<Path>,
1553 inode: u64,
1554 mtime: SystemTime,
1555 is_symlink: bool,
1556 is_ignored: bool,
1557}
1558
1559#[derive(Clone, Debug)]
1560pub enum EntryKind {
1561 PendingDir,
1562 Dir,
1563 File(CharBag),
1564}
1565
1566impl Entry {
1567 fn new(
1568 path: Arc<Path>,
1569 metadata: &fs::Metadata,
1570 next_entry_id: &AtomicUsize,
1571 root_char_bag: CharBag,
1572 ) -> Self {
1573 Self {
1574 id: next_entry_id.fetch_add(1, SeqCst),
1575 kind: if metadata.is_dir {
1576 EntryKind::PendingDir
1577 } else {
1578 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1579 },
1580 path,
1581 inode: metadata.inode,
1582 mtime: metadata.mtime,
1583 is_symlink: metadata.is_symlink,
1584 is_ignored: false,
1585 }
1586 }
1587
1588 pub fn path(&self) -> &Arc<Path> {
1589 &self.path
1590 }
1591
1592 pub fn inode(&self) -> u64 {
1593 self.inode
1594 }
1595
1596 pub fn is_ignored(&self) -> bool {
1597 self.is_ignored
1598 }
1599
1600 fn is_dir(&self) -> bool {
1601 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1602 }
1603
1604 fn is_file(&self) -> bool {
1605 matches!(self.kind, EntryKind::File(_))
1606 }
1607}
1608
1609impl sum_tree::Item for Entry {
1610 type Summary = EntrySummary;
1611
1612 fn summary(&self) -> Self::Summary {
1613 let file_count;
1614 let visible_file_count;
1615 if self.is_file() {
1616 file_count = 1;
1617 if self.is_ignored {
1618 visible_file_count = 0;
1619 } else {
1620 visible_file_count = 1;
1621 }
1622 } else {
1623 file_count = 0;
1624 visible_file_count = 0;
1625 }
1626
1627 EntrySummary {
1628 max_path: self.path().clone(),
1629 file_count,
1630 visible_file_count,
1631 }
1632 }
1633}
1634
1635impl sum_tree::KeyedItem for Entry {
1636 type Key = PathKey;
1637
1638 fn key(&self) -> Self::Key {
1639 PathKey(self.path().clone())
1640 }
1641}
1642
1643#[derive(Clone, Debug)]
1644pub struct EntrySummary {
1645 max_path: Arc<Path>,
1646 file_count: usize,
1647 visible_file_count: usize,
1648}
1649
1650impl Default for EntrySummary {
1651 fn default() -> Self {
1652 Self {
1653 max_path: Arc::from(Path::new("")),
1654 file_count: 0,
1655 visible_file_count: 0,
1656 }
1657 }
1658}
1659
1660impl sum_tree::Summary for EntrySummary {
1661 type Context = ();
1662
1663 fn add_summary(&mut self, rhs: &Self, _: &()) {
1664 self.max_path = rhs.max_path.clone();
1665 self.file_count += rhs.file_count;
1666 self.visible_file_count += rhs.visible_file_count;
1667 }
1668}
1669
1670#[derive(Clone, Debug)]
1671struct PathEntry {
1672 id: usize,
1673 path: Arc<Path>,
1674 scan_id: usize,
1675}
1676
1677impl sum_tree::Item for PathEntry {
1678 type Summary = PathEntrySummary;
1679
1680 fn summary(&self) -> Self::Summary {
1681 PathEntrySummary { max_id: self.id }
1682 }
1683}
1684
1685impl sum_tree::KeyedItem for PathEntry {
1686 type Key = usize;
1687
1688 fn key(&self) -> Self::Key {
1689 self.id
1690 }
1691}
1692
1693#[derive(Clone, Debug, Default)]
1694struct PathEntrySummary {
1695 max_id: usize,
1696}
1697
1698impl sum_tree::Summary for PathEntrySummary {
1699 type Context = ();
1700
1701 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1702 self.max_id = summary.max_id;
1703 }
1704}
1705
1706impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1707 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1708 *self = summary.max_id;
1709 }
1710}
1711
1712#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1713pub struct PathKey(Arc<Path>);
1714
1715impl Default for PathKey {
1716 fn default() -> Self {
1717 Self(Path::new("").into())
1718 }
1719}
1720
1721impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1722 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1723 self.0 = summary.max_path.clone();
1724 }
1725}
1726
1727#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1728enum PathSearch<'a> {
1729 Exact(&'a Path),
1730 Successor(&'a Path),
1731}
1732
1733impl<'a> Ord for PathSearch<'a> {
1734 fn cmp(&self, other: &Self) -> cmp::Ordering {
1735 match (self, other) {
1736 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1737 (Self::Successor(a), Self::Exact(b)) => {
1738 if b.starts_with(a) {
1739 cmp::Ordering::Greater
1740 } else {
1741 a.cmp(b)
1742 }
1743 }
1744 _ => unreachable!("not sure we need the other two cases"),
1745 }
1746 }
1747}
1748
1749impl<'a> PartialOrd for PathSearch<'a> {
1750 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1751 Some(self.cmp(other))
1752 }
1753}
1754
1755impl<'a> Default for PathSearch<'a> {
1756 fn default() -> Self {
1757 Self::Exact(Path::new("").into())
1758 }
1759}
1760
1761impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1762 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1763 *self = Self::Exact(summary.max_path.as_ref());
1764 }
1765}
1766
1767#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1768pub struct FileCount(usize);
1769
1770impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1771 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1772 self.0 += summary.file_count;
1773 }
1774}
1775
1776#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1777pub struct VisibleFileCount(usize);
1778
1779impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1780 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1781 self.0 += summary.visible_file_count;
1782 }
1783}
1784
1785struct BackgroundScanner {
1786 fs: Arc<dyn Fs>,
1787 snapshot: Arc<Mutex<Snapshot>>,
1788 notify: Sender<ScanState>,
1789 executor: Arc<executor::Background>,
1790}
1791
1792impl BackgroundScanner {
1793 fn new(
1794 snapshot: Arc<Mutex<Snapshot>>,
1795 notify: Sender<ScanState>,
1796 fs: Arc<dyn Fs>,
1797 executor: Arc<executor::Background>,
1798 ) -> Self {
1799 Self {
1800 fs,
1801 snapshot,
1802 notify,
1803 executor,
1804 }
1805 }
1806
1807 fn abs_path(&self) -> Arc<Path> {
1808 self.snapshot.lock().abs_path.clone()
1809 }
1810
1811 fn snapshot(&self) -> Snapshot {
1812 self.snapshot.lock().clone()
1813 }
1814
1815 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
1816 if self.notify.send(ScanState::Scanning).await.is_err() {
1817 return;
1818 }
1819
1820 if let Err(err) = self.scan_dirs().await {
1821 if self
1822 .notify
1823 .send(ScanState::Err(Arc::new(err)))
1824 .await
1825 .is_err()
1826 {
1827 return;
1828 }
1829 }
1830
1831 if self.notify.send(ScanState::Idle).await.is_err() {
1832 return;
1833 }
1834
1835 futures::pin_mut!(events_rx);
1836 while let Some(events) = events_rx.next().await {
1837 if self.notify.send(ScanState::Scanning).await.is_err() {
1838 break;
1839 }
1840
1841 if !self.process_events(events).await {
1842 break;
1843 }
1844
1845 if self.notify.send(ScanState::Idle).await.is_err() {
1846 break;
1847 }
1848 }
1849 }
1850
1851 async fn scan_dirs(&mut self) -> Result<()> {
1852 let root_char_bag;
1853 let next_entry_id;
1854 let is_dir;
1855 {
1856 let snapshot = self.snapshot.lock();
1857 root_char_bag = snapshot.root_char_bag;
1858 next_entry_id = snapshot.next_entry_id.clone();
1859 is_dir = snapshot.root_entry().is_dir();
1860 }
1861
1862 if is_dir {
1863 let path: Arc<Path> = Arc::from(Path::new(""));
1864 let abs_path = self.abs_path();
1865 let (tx, rx) = channel::unbounded();
1866 tx.send(ScanJob {
1867 abs_path: abs_path.to_path_buf(),
1868 path,
1869 ignore_stack: IgnoreStack::none(),
1870 scan_queue: tx.clone(),
1871 })
1872 .await
1873 .unwrap();
1874 drop(tx);
1875
1876 self.executor
1877 .scoped(|scope| {
1878 for _ in 0..self.executor.num_cpus() {
1879 scope.spawn(async {
1880 while let Ok(job) = rx.recv().await {
1881 if let Err(err) = self
1882 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
1883 .await
1884 {
1885 log::error!("error scanning {:?}: {}", job.abs_path, err);
1886 }
1887 }
1888 });
1889 }
1890 })
1891 .await;
1892 }
1893
1894 Ok(())
1895 }
1896
1897 async fn scan_dir(
1898 &self,
1899 root_char_bag: CharBag,
1900 next_entry_id: Arc<AtomicUsize>,
1901 job: &ScanJob,
1902 ) -> Result<()> {
1903 let mut new_entries: Vec<Entry> = Vec::new();
1904 let mut new_jobs: Vec<ScanJob> = Vec::new();
1905 let mut ignore_stack = job.ignore_stack.clone();
1906 let mut new_ignore = None;
1907
1908 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
1909 while let Some(child_abs_path) = child_paths.next().await {
1910 let child_abs_path = match child_abs_path {
1911 Ok(child_abs_path) => child_abs_path,
1912 Err(error) => {
1913 log::error!("error processing entry {:?}", error);
1914 continue;
1915 }
1916 };
1917 let child_name = child_abs_path.file_name().unwrap();
1918 let child_path: Arc<Path> = job.path.join(child_name).into();
1919 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
1920 Some(metadata) => metadata,
1921 None => continue,
1922 };
1923
1924 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
1925 if child_name == *GITIGNORE {
1926 let (ignore, err) = Gitignore::new(&child_abs_path);
1927 if let Some(err) = err {
1928 log::error!("error in ignore file {:?} - {:?}", child_name, err);
1929 }
1930 let ignore = Arc::new(ignore);
1931 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
1932 new_ignore = Some(ignore);
1933
1934 // Update ignore status of any child entries we've already processed to reflect the
1935 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
1936 // there should rarely be too numerous. Update the ignore stack associated with any
1937 // new jobs as well.
1938 let mut new_jobs = new_jobs.iter_mut();
1939 for entry in &mut new_entries {
1940 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
1941 if entry.is_dir() {
1942 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
1943 IgnoreStack::all()
1944 } else {
1945 ignore_stack.clone()
1946 };
1947 }
1948 }
1949 }
1950
1951 let mut child_entry = Entry::new(
1952 child_path.clone(),
1953 &child_metadata,
1954 &next_entry_id,
1955 root_char_bag,
1956 );
1957
1958 if child_metadata.is_dir {
1959 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
1960 child_entry.is_ignored = is_ignored;
1961 new_entries.push(child_entry);
1962 new_jobs.push(ScanJob {
1963 abs_path: child_abs_path,
1964 path: child_path,
1965 ignore_stack: if is_ignored {
1966 IgnoreStack::all()
1967 } else {
1968 ignore_stack.clone()
1969 },
1970 scan_queue: job.scan_queue.clone(),
1971 });
1972 } else {
1973 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
1974 new_entries.push(child_entry);
1975 };
1976 }
1977
1978 self.snapshot
1979 .lock()
1980 .populate_dir(job.path.clone(), new_entries, new_ignore);
1981 for new_job in new_jobs {
1982 job.scan_queue.send(new_job).await.unwrap();
1983 }
1984
1985 Ok(())
1986 }
1987
1988 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
1989 let mut snapshot = self.snapshot();
1990 snapshot.scan_id += 1;
1991
1992 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
1993 abs_path
1994 } else {
1995 return false;
1996 };
1997 let root_char_bag = snapshot.root_char_bag;
1998 let next_entry_id = snapshot.next_entry_id.clone();
1999
2000 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2001 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2002
2003 for event in &events {
2004 match event.path.strip_prefix(&root_abs_path) {
2005 Ok(path) => snapshot.remove_path(&path),
2006 Err(_) => {
2007 log::error!(
2008 "unexpected event {:?} for root path {:?}",
2009 event.path,
2010 root_abs_path
2011 );
2012 continue;
2013 }
2014 }
2015 }
2016
2017 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2018 for event in events {
2019 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2020 Ok(path) => Arc::from(path.to_path_buf()),
2021 Err(_) => {
2022 log::error!(
2023 "unexpected event {:?} for root path {:?}",
2024 event.path,
2025 root_abs_path
2026 );
2027 continue;
2028 }
2029 };
2030
2031 match self.fs.metadata(&event.path).await {
2032 Ok(Some(metadata)) => {
2033 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2034 let mut fs_entry = Entry::new(
2035 path.clone(),
2036 &metadata,
2037 snapshot.next_entry_id.as_ref(),
2038 snapshot.root_char_bag,
2039 );
2040 fs_entry.is_ignored = ignore_stack.is_all();
2041 snapshot.insert_entry(fs_entry);
2042 if metadata.is_dir {
2043 scan_queue_tx
2044 .send(ScanJob {
2045 abs_path: event.path,
2046 path,
2047 ignore_stack,
2048 scan_queue: scan_queue_tx.clone(),
2049 })
2050 .await
2051 .unwrap();
2052 }
2053 }
2054 Ok(None) => {}
2055 Err(err) => {
2056 // TODO - create a special 'error' entry in the entries tree to mark this
2057 log::error!("error reading file on event {:?}", err);
2058 }
2059 }
2060 }
2061
2062 *self.snapshot.lock() = snapshot;
2063
2064 // Scan any directories that were created as part of this event batch.
2065 drop(scan_queue_tx);
2066 self.executor
2067 .scoped(|scope| {
2068 for _ in 0..self.executor.num_cpus() {
2069 scope.spawn(async {
2070 while let Ok(job) = scan_queue_rx.recv().await {
2071 if let Err(err) = self
2072 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2073 .await
2074 {
2075 log::error!("error scanning {:?}: {}", job.abs_path, err);
2076 }
2077 }
2078 });
2079 }
2080 })
2081 .await;
2082
2083 // Attempt to detect renames only over a single batch of file-system events.
2084 self.snapshot.lock().removed_entry_ids.clear();
2085
2086 self.update_ignore_statuses().await;
2087 true
2088 }
2089
2090 async fn update_ignore_statuses(&self) {
2091 let mut snapshot = self.snapshot();
2092
2093 let mut ignores_to_update = Vec::new();
2094 let mut ignores_to_delete = Vec::new();
2095 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2096 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2097 ignores_to_update.push(parent_path.clone());
2098 }
2099
2100 let ignore_path = parent_path.join(&*GITIGNORE);
2101 if snapshot.entry_for_path(ignore_path).is_none() {
2102 ignores_to_delete.push(parent_path.clone());
2103 }
2104 }
2105
2106 for parent_path in ignores_to_delete {
2107 snapshot.ignores.remove(&parent_path);
2108 self.snapshot.lock().ignores.remove(&parent_path);
2109 }
2110
2111 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2112 ignores_to_update.sort_unstable();
2113 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2114 while let Some(parent_path) = ignores_to_update.next() {
2115 while ignores_to_update
2116 .peek()
2117 .map_or(false, |p| p.starts_with(&parent_path))
2118 {
2119 ignores_to_update.next().unwrap();
2120 }
2121
2122 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2123 ignore_queue_tx
2124 .send(UpdateIgnoreStatusJob {
2125 path: parent_path,
2126 ignore_stack,
2127 ignore_queue: ignore_queue_tx.clone(),
2128 })
2129 .await
2130 .unwrap();
2131 }
2132 drop(ignore_queue_tx);
2133
2134 self.executor
2135 .scoped(|scope| {
2136 for _ in 0..self.executor.num_cpus() {
2137 scope.spawn(async {
2138 while let Ok(job) = ignore_queue_rx.recv().await {
2139 self.update_ignore_status(job, &snapshot).await;
2140 }
2141 });
2142 }
2143 })
2144 .await;
2145 }
2146
2147 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2148 let mut ignore_stack = job.ignore_stack;
2149 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2150 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2151 }
2152
2153 let mut edits = Vec::new();
2154 for mut entry in snapshot.child_entries(&job.path).cloned() {
2155 let was_ignored = entry.is_ignored;
2156 entry.is_ignored = ignore_stack.is_path_ignored(entry.path(), entry.is_dir());
2157 if entry.is_dir() {
2158 let child_ignore_stack = if entry.is_ignored {
2159 IgnoreStack::all()
2160 } else {
2161 ignore_stack.clone()
2162 };
2163 job.ignore_queue
2164 .send(UpdateIgnoreStatusJob {
2165 path: entry.path().clone(),
2166 ignore_stack: child_ignore_stack,
2167 ignore_queue: job.ignore_queue.clone(),
2168 })
2169 .await
2170 .unwrap();
2171 }
2172
2173 if entry.is_ignored != was_ignored {
2174 edits.push(Edit::Insert(entry));
2175 }
2176 }
2177 self.snapshot.lock().entries_by_path.edit(edits, &());
2178 }
2179}
2180
2181async fn refresh_entry(
2182 fs: &dyn Fs,
2183 snapshot: &Mutex<Snapshot>,
2184 path: Arc<Path>,
2185 abs_path: &Path,
2186) -> Result<Entry> {
2187 let root_char_bag;
2188 let next_entry_id;
2189 {
2190 let snapshot = snapshot.lock();
2191 root_char_bag = snapshot.root_char_bag;
2192 next_entry_id = snapshot.next_entry_id.clone();
2193 }
2194 let entry = Entry::new(
2195 path,
2196 &fs.metadata(abs_path)
2197 .await?
2198 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2199 &next_entry_id,
2200 root_char_bag,
2201 );
2202 Ok(snapshot.lock().insert_entry(entry))
2203}
2204
2205fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2206 let mut result = root_char_bag;
2207 result.extend(
2208 path.to_string_lossy()
2209 .chars()
2210 .map(|c| c.to_ascii_lowercase()),
2211 );
2212 result
2213}
2214
2215struct ScanJob {
2216 abs_path: PathBuf,
2217 path: Arc<Path>,
2218 ignore_stack: Arc<IgnoreStack>,
2219 scan_queue: Sender<ScanJob>,
2220}
2221
2222struct UpdateIgnoreStatusJob {
2223 path: Arc<Path>,
2224 ignore_stack: Arc<IgnoreStack>,
2225 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2226}
2227
2228pub trait WorktreeHandle {
2229 #[cfg(test)]
2230 fn flush_fs_events<'a>(
2231 &self,
2232 cx: &'a gpui::TestAppContext,
2233 ) -> futures::future::LocalBoxFuture<'a, ()>;
2234}
2235
2236impl WorktreeHandle for ModelHandle<Worktree> {
2237 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2238 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2239 // extra directory scans, and emit extra scan-state notifications.
2240 //
2241 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2242 // to ensure that all redundant FS events have already been processed.
2243 #[cfg(test)]
2244 fn flush_fs_events<'a>(
2245 &self,
2246 cx: &'a gpui::TestAppContext,
2247 ) -> futures::future::LocalBoxFuture<'a, ()> {
2248 use smol::future::FutureExt;
2249
2250 let filename = "fs-event-sentinel";
2251 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2252 let tree = self.clone();
2253 async move {
2254 std::fs::write(root_path.join(filename), "").unwrap();
2255 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2256 .await;
2257
2258 std::fs::remove_file(root_path.join(filename)).unwrap();
2259 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2260 .await;
2261
2262 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2263 .await;
2264 }
2265 .boxed_local()
2266 }
2267}
2268
2269pub enum FileIter<'a> {
2270 All(Cursor<'a, Entry, FileCount, ()>),
2271 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2272}
2273
2274impl<'a> FileIter<'a> {
2275 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2276 let mut cursor = snapshot.entries_by_path.cursor();
2277 cursor.seek(&FileCount(start), Bias::Right, &());
2278 Self::All(cursor)
2279 }
2280
2281 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2282 let mut cursor = snapshot.entries_by_path.cursor();
2283 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2284 Self::Visible(cursor)
2285 }
2286
2287 fn next_internal(&mut self) {
2288 match self {
2289 Self::All(cursor) => {
2290 let ix = *cursor.seek_start();
2291 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2292 }
2293 Self::Visible(cursor) => {
2294 let ix = *cursor.seek_start();
2295 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2296 }
2297 }
2298 }
2299
2300 fn item(&self) -> Option<&'a Entry> {
2301 match self {
2302 Self::All(cursor) => cursor.item(),
2303 Self::Visible(cursor) => cursor.item(),
2304 }
2305 }
2306}
2307
2308impl<'a> Iterator for FileIter<'a> {
2309 type Item = &'a Entry;
2310
2311 fn next(&mut self) -> Option<Self::Item> {
2312 if let Some(entry) = self.item() {
2313 self.next_internal();
2314 Some(entry)
2315 } else {
2316 None
2317 }
2318 }
2319}
2320
2321struct ChildEntriesIter<'a> {
2322 parent_path: &'a Path,
2323 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2324}
2325
2326impl<'a> ChildEntriesIter<'a> {
2327 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2328 let mut cursor = snapshot.entries_by_path.cursor();
2329 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2330 Self {
2331 parent_path,
2332 cursor,
2333 }
2334 }
2335}
2336
2337impl<'a> Iterator for ChildEntriesIter<'a> {
2338 type Item = &'a Entry;
2339
2340 fn next(&mut self) -> Option<Self::Item> {
2341 if let Some(item) = self.cursor.item() {
2342 if item.path().starts_with(self.parent_path) {
2343 self.cursor
2344 .seek_forward(&PathSearch::Successor(item.path()), Bias::Left, &());
2345 Some(item)
2346 } else {
2347 None
2348 }
2349 } else {
2350 None
2351 }
2352 }
2353}
2354
2355impl<'a> From<&'a Entry> for proto::Entry {
2356 fn from(entry: &'a Entry) -> Self {
2357 Self {
2358 id: entry.id as u64,
2359 is_dir: entry.is_dir(),
2360 path: entry.path.to_string_lossy().to_string(),
2361 inode: entry.inode,
2362 mtime: Some(entry.mtime.into()),
2363 is_symlink: entry.is_symlink,
2364 is_ignored: entry.is_ignored,
2365 }
2366 }
2367}
2368
2369impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2370 type Error = anyhow::Error;
2371
2372 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2373 if let Some(mtime) = entry.mtime {
2374 let kind = if entry.is_dir {
2375 EntryKind::Dir
2376 } else {
2377 let mut char_bag = root_char_bag.clone();
2378 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2379 EntryKind::File(char_bag)
2380 };
2381 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2382 Ok(Entry {
2383 id: entry.id as usize,
2384 kind,
2385 path: path.clone(),
2386 inode: entry.inode,
2387 mtime: mtime.into(),
2388 is_symlink: entry.is_symlink,
2389 is_ignored: entry.is_ignored,
2390 })
2391 } else {
2392 Err(anyhow!(
2393 "missing mtime in remote worktree entry {:?}",
2394 entry.path
2395 ))
2396 }
2397 }
2398}
2399
2400mod remote {
2401 use super::*;
2402
2403 pub async fn add_peer(
2404 envelope: TypedEnvelope<proto::AddPeer>,
2405 rpc: &rpc::Client,
2406 cx: &mut AsyncAppContext,
2407 ) -> anyhow::Result<()> {
2408 rpc.state
2409 .read()
2410 .await
2411 .shared_worktree(envelope.payload.worktree_id, cx)?
2412 .update(cx, |worktree, cx| worktree.add_peer(envelope, cx))
2413 }
2414
2415 pub async fn remove_peer(
2416 envelope: TypedEnvelope<proto::RemovePeer>,
2417 rpc: &rpc::Client,
2418 cx: &mut AsyncAppContext,
2419 ) -> anyhow::Result<()> {
2420 rpc.state
2421 .read()
2422 .await
2423 .shared_worktree(envelope.payload.worktree_id, cx)?
2424 .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx))
2425 }
2426
2427 pub async fn update_worktree(
2428 envelope: TypedEnvelope<proto::UpdateWorktree>,
2429 rpc: &rpc::Client,
2430 cx: &mut AsyncAppContext,
2431 ) -> anyhow::Result<()> {
2432 rpc.state
2433 .read()
2434 .await
2435 .shared_worktree(envelope.payload.worktree_id, cx)?
2436 .update(cx, |worktree, _| {
2437 if let Some(worktree) = worktree.as_remote_mut() {
2438 let mut tx = worktree.updates_tx.clone();
2439 Ok(async move {
2440 tx.send(envelope.payload)
2441 .await
2442 .expect("receiver runs to completion");
2443 })
2444 } else {
2445 Err(anyhow!(
2446 "invalid update message for local worktree {}",
2447 envelope.payload.worktree_id
2448 ))
2449 }
2450 })?
2451 .await;
2452
2453 Ok(())
2454 }
2455
2456 pub async fn open_buffer(
2457 envelope: TypedEnvelope<proto::OpenBuffer>,
2458 rpc: &rpc::Client,
2459 cx: &mut AsyncAppContext,
2460 ) -> anyhow::Result<()> {
2461 let receipt = envelope.receipt();
2462 let worktree = rpc
2463 .state
2464 .read()
2465 .await
2466 .shared_worktree(envelope.payload.worktree_id, cx)?;
2467
2468 let response = worktree
2469 .update(cx, |worktree, cx| {
2470 worktree
2471 .as_local_mut()
2472 .unwrap()
2473 .open_remote_buffer(envelope, cx)
2474 })
2475 .await?;
2476
2477 rpc.respond(receipt, response).await?;
2478
2479 Ok(())
2480 }
2481
2482 pub async fn close_buffer(
2483 envelope: TypedEnvelope<proto::CloseBuffer>,
2484 rpc: &rpc::Client,
2485 cx: &mut AsyncAppContext,
2486 ) -> anyhow::Result<()> {
2487 let worktree = rpc
2488 .state
2489 .read()
2490 .await
2491 .shared_worktree(envelope.payload.worktree_id, cx)?;
2492
2493 worktree.update(cx, |worktree, cx| {
2494 worktree
2495 .as_local_mut()
2496 .unwrap()
2497 .close_remote_buffer(envelope, cx)
2498 })
2499 }
2500
2501 pub async fn update_buffer(
2502 envelope: TypedEnvelope<proto::UpdateBuffer>,
2503 rpc: &rpc::Client,
2504 cx: &mut AsyncAppContext,
2505 ) -> anyhow::Result<()> {
2506 let message = envelope.payload;
2507 rpc.state
2508 .read()
2509 .await
2510 .shared_worktree(message.worktree_id, cx)?
2511 .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
2512 Ok(())
2513 }
2514
2515 pub async fn save_buffer(
2516 envelope: TypedEnvelope<proto::SaveBuffer>,
2517 rpc: &rpc::Client,
2518 cx: &mut AsyncAppContext,
2519 ) -> anyhow::Result<()> {
2520 let state = rpc.state.read().await;
2521 let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
2522 let sender_id = envelope.original_sender_id()?;
2523 let buffer = worktree.read_with(cx, |tree, _| {
2524 tree.as_local()
2525 .unwrap()
2526 .shared_buffers
2527 .get(&sender_id)
2528 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
2529 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
2530 })?;
2531 let (version, mtime) = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
2532 rpc.respond(
2533 envelope.receipt(),
2534 proto::BufferSaved {
2535 worktree_id: envelope.payload.worktree_id,
2536 buffer_id: envelope.payload.buffer_id,
2537 version: (&version).into(),
2538 mtime: Some(mtime.into()),
2539 },
2540 )
2541 .await?;
2542 Ok(())
2543 }
2544
2545 pub async fn buffer_saved(
2546 envelope: TypedEnvelope<proto::BufferSaved>,
2547 rpc: &rpc::Client,
2548 cx: &mut AsyncAppContext,
2549 ) -> anyhow::Result<()> {
2550 rpc.state
2551 .read()
2552 .await
2553 .shared_worktree(envelope.payload.worktree_id, cx)?
2554 .update(cx, |worktree, cx| {
2555 worktree.buffer_saved(envelope.payload, cx)
2556 })?;
2557 Ok(())
2558 }
2559}
2560
2561#[cfg(test)]
2562mod tests {
2563 use super::*;
2564 use crate::test::*;
2565 use anyhow::Result;
2566 use fs::RealFs;
2567 use rand::prelude::*;
2568 use serde_json::json;
2569 use std::time::UNIX_EPOCH;
2570 use std::{env, fmt::Write, os::unix, time::SystemTime};
2571
2572 #[gpui::test]
2573 async fn test_populate_and_search(cx: gpui::TestAppContext) {
2574 let dir = temp_tree(json!({
2575 "root": {
2576 "apple": "",
2577 "banana": {
2578 "carrot": {
2579 "date": "",
2580 "endive": "",
2581 }
2582 },
2583 "fennel": {
2584 "grape": "",
2585 }
2586 }
2587 }));
2588
2589 let root_link_path = dir.path().join("root_link");
2590 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2591 unix::fs::symlink(
2592 &dir.path().join("root/fennel"),
2593 &dir.path().join("root/finnochio"),
2594 )
2595 .unwrap();
2596
2597 let tree = Worktree::open_local(
2598 root_link_path,
2599 Default::default(),
2600 Arc::new(RealFs),
2601 &mut cx.to_async(),
2602 )
2603 .await
2604 .unwrap();
2605
2606 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2607 .await;
2608 let snapshot = cx.read(|cx| {
2609 let tree = tree.read(cx);
2610 assert_eq!(tree.file_count(), 5);
2611 assert_eq!(
2612 tree.inode_for_path("fennel/grape"),
2613 tree.inode_for_path("finnochio/grape")
2614 );
2615
2616 tree.snapshot()
2617 });
2618 let results = cx
2619 .read(|cx| {
2620 match_paths(
2621 Some(&snapshot).into_iter(),
2622 "bna",
2623 false,
2624 false,
2625 false,
2626 10,
2627 Default::default(),
2628 cx.background().clone(),
2629 )
2630 })
2631 .await;
2632 assert_eq!(
2633 results
2634 .into_iter()
2635 .map(|result| result.path)
2636 .collect::<Vec<Arc<Path>>>(),
2637 vec![
2638 PathBuf::from("banana/carrot/date").into(),
2639 PathBuf::from("banana/carrot/endive").into(),
2640 ]
2641 );
2642 }
2643
2644 #[gpui::test]
2645 async fn test_save_file(mut cx: gpui::TestAppContext) {
2646 let app_state = cx.read(build_app_state);
2647 let dir = temp_tree(json!({
2648 "file1": "the old contents",
2649 }));
2650 let tree = Worktree::open_local(
2651 dir.path(),
2652 app_state.languages.clone(),
2653 Arc::new(RealFs),
2654 &mut cx.to_async(),
2655 )
2656 .await
2657 .unwrap();
2658 let buffer = tree
2659 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2660 .await
2661 .unwrap();
2662 let save = buffer.update(&mut cx, |buffer, cx| {
2663 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2664 buffer.save(cx).unwrap()
2665 });
2666 save.await.unwrap();
2667
2668 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2669 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2670 }
2671
2672 #[gpui::test]
2673 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2674 let app_state = cx.read(build_app_state);
2675 let dir = temp_tree(json!({
2676 "file1": "the old contents",
2677 }));
2678 let file_path = dir.path().join("file1");
2679
2680 let tree = Worktree::open_local(
2681 file_path.clone(),
2682 app_state.languages.clone(),
2683 Arc::new(RealFs),
2684 &mut cx.to_async(),
2685 )
2686 .await
2687 .unwrap();
2688 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2689 .await;
2690 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2691
2692 let buffer = tree
2693 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2694 .await
2695 .unwrap();
2696 let save = buffer.update(&mut cx, |buffer, cx| {
2697 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2698 buffer.save(cx).unwrap()
2699 });
2700 save.await.unwrap();
2701
2702 let new_text = std::fs::read_to_string(file_path).unwrap();
2703 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2704 }
2705
2706 #[gpui::test]
2707 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2708 let dir = temp_tree(json!({
2709 "a": {
2710 "file1": "",
2711 "file2": "",
2712 "file3": "",
2713 },
2714 "b": {
2715 "c": {
2716 "file4": "",
2717 "file5": "",
2718 }
2719 }
2720 }));
2721
2722 let tree = Worktree::open_local(
2723 dir.path(),
2724 Default::default(),
2725 Arc::new(RealFs),
2726 &mut cx.to_async(),
2727 )
2728 .await
2729 .unwrap();
2730
2731 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2732 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2733 async move { buffer.await.unwrap() }
2734 };
2735 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2736 tree.read_with(cx, |tree, _| {
2737 tree.entry_for_path(path)
2738 .expect(&format!("no entry for path {}", path))
2739 .id
2740 })
2741 };
2742
2743 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2744 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2745 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2746 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2747
2748 let file2_id = id_for_path("a/file2", &cx);
2749 let file3_id = id_for_path("a/file3", &cx);
2750 let file4_id = id_for_path("b/c/file4", &cx);
2751
2752 // Wait for the initial scan.
2753 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2754 .await;
2755
2756 // Create a remote copy of this worktree.
2757 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2758 let worktree_id = 1;
2759 let share_request = tree
2760 .update(&mut cx, |tree, cx| {
2761 tree.as_local().unwrap().share_request(cx)
2762 })
2763 .await;
2764 let remote = Worktree::remote(
2765 proto::OpenWorktreeResponse {
2766 worktree_id,
2767 worktree: share_request.worktree,
2768 replica_id: 1,
2769 peers: Vec::new(),
2770 },
2771 rpc::Client::new(Default::default()),
2772 Default::default(),
2773 &mut cx.to_async(),
2774 )
2775 .await
2776 .unwrap();
2777
2778 cx.read(|cx| {
2779 assert!(!buffer2.read(cx).is_dirty());
2780 assert!(!buffer3.read(cx).is_dirty());
2781 assert!(!buffer4.read(cx).is_dirty());
2782 assert!(!buffer5.read(cx).is_dirty());
2783 });
2784
2785 // Rename and delete files and directories.
2786 tree.flush_fs_events(&cx).await;
2787 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2788 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2789 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2790 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2791 tree.flush_fs_events(&cx).await;
2792
2793 let expected_paths = vec![
2794 "a",
2795 "a/file1",
2796 "a/file2.new",
2797 "b",
2798 "d",
2799 "d/file3",
2800 "d/file4",
2801 ];
2802
2803 cx.read(|app| {
2804 assert_eq!(
2805 tree.read(app)
2806 .paths()
2807 .map(|p| p.to_str().unwrap())
2808 .collect::<Vec<_>>(),
2809 expected_paths
2810 );
2811
2812 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2813 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2814 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2815
2816 assert_eq!(
2817 buffer2.read(app).file().unwrap().path().as_ref(),
2818 Path::new("a/file2.new")
2819 );
2820 assert_eq!(
2821 buffer3.read(app).file().unwrap().path().as_ref(),
2822 Path::new("d/file3")
2823 );
2824 assert_eq!(
2825 buffer4.read(app).file().unwrap().path().as_ref(),
2826 Path::new("d/file4")
2827 );
2828 assert_eq!(
2829 buffer5.read(app).file().unwrap().path().as_ref(),
2830 Path::new("b/c/file5")
2831 );
2832
2833 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2834 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2835 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2836 assert!(buffer5.read(app).file().unwrap().is_deleted());
2837 });
2838
2839 // Update the remote worktree. Check that it becomes consistent with the
2840 // local worktree.
2841 remote.update(&mut cx, |remote, cx| {
2842 let update_message = tree
2843 .read(cx)
2844 .snapshot()
2845 .build_update(&initial_snapshot, worktree_id);
2846 remote
2847 .as_remote_mut()
2848 .unwrap()
2849 .snapshot
2850 .apply_update(update_message)
2851 .unwrap();
2852
2853 assert_eq!(
2854 remote
2855 .paths()
2856 .map(|p| p.to_str().unwrap())
2857 .collect::<Vec<_>>(),
2858 expected_paths
2859 );
2860 });
2861 }
2862
2863 #[gpui::test]
2864 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2865 let dir = temp_tree(json!({
2866 ".git": {},
2867 ".gitignore": "ignored-dir\n",
2868 "tracked-dir": {
2869 "tracked-file1": "tracked contents",
2870 },
2871 "ignored-dir": {
2872 "ignored-file1": "ignored contents",
2873 }
2874 }));
2875
2876 let tree = Worktree::open_local(
2877 dir.path(),
2878 Default::default(),
2879 Arc::new(RealFs),
2880 &mut cx.to_async(),
2881 )
2882 .await
2883 .unwrap();
2884 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2885 .await;
2886 tree.flush_fs_events(&cx).await;
2887 cx.read(|cx| {
2888 let tree = tree.read(cx);
2889 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2890 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2891 assert_eq!(tracked.is_ignored(), false);
2892 assert_eq!(ignored.is_ignored(), true);
2893 });
2894
2895 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2896 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2897 tree.flush_fs_events(&cx).await;
2898 cx.read(|cx| {
2899 let tree = tree.read(cx);
2900 let dot_git = tree.entry_for_path(".git").unwrap();
2901 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2902 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2903 assert_eq!(tracked.is_ignored(), false);
2904 assert_eq!(ignored.is_ignored(), true);
2905 assert_eq!(dot_git.is_ignored(), true);
2906 });
2907 }
2908
2909 #[test]
2910 fn test_random() {
2911 let iterations = env::var("ITERATIONS")
2912 .map(|i| i.parse().unwrap())
2913 .unwrap_or(100);
2914 let operations = env::var("OPERATIONS")
2915 .map(|o| o.parse().unwrap())
2916 .unwrap_or(40);
2917 let initial_entries = env::var("INITIAL_ENTRIES")
2918 .map(|o| o.parse().unwrap())
2919 .unwrap_or(20);
2920 let seeds = if let Ok(seed) = env::var("SEED").map(|s| s.parse().unwrap()) {
2921 seed..seed + 1
2922 } else {
2923 0..iterations
2924 };
2925
2926 for seed in seeds {
2927 dbg!(seed);
2928 let mut rng = StdRng::seed_from_u64(seed);
2929
2930 let root_dir = tempdir::TempDir::new(&format!("test-{}", seed)).unwrap();
2931 for _ in 0..initial_entries {
2932 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2933 }
2934 log::info!("Generated initial tree");
2935
2936 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2937 let fs = Arc::new(RealFs);
2938 let next_entry_id = Arc::new(AtomicUsize::new(0));
2939 let mut initial_snapshot = Snapshot {
2940 id: 0,
2941 scan_id: 0,
2942 abs_path: root_dir.path().into(),
2943 entries_by_path: Default::default(),
2944 entries_by_id: Default::default(),
2945 removed_entry_ids: Default::default(),
2946 ignores: Default::default(),
2947 root_name: Default::default(),
2948 root_char_bag: Default::default(),
2949 next_entry_id: next_entry_id.clone(),
2950 };
2951 initial_snapshot.insert_entry(Entry::new(
2952 Path::new("").into(),
2953 &smol::block_on(fs.metadata(root_dir.path()))
2954 .unwrap()
2955 .unwrap(),
2956 &next_entry_id,
2957 Default::default(),
2958 ));
2959 let mut scanner = BackgroundScanner::new(
2960 Arc::new(Mutex::new(initial_snapshot.clone())),
2961 notify_tx,
2962 fs.clone(),
2963 Arc::new(gpui::executor::Background::new()),
2964 );
2965 smol::block_on(scanner.scan_dirs()).unwrap();
2966 scanner.snapshot().check_invariants();
2967
2968 let mut events = Vec::new();
2969 let mut mutations_len = operations;
2970 while mutations_len > 1 {
2971 if !events.is_empty() && rng.gen_bool(0.4) {
2972 let len = rng.gen_range(0..=events.len());
2973 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2974 log::info!("Delivering events: {:#?}", to_deliver);
2975 smol::block_on(scanner.process_events(to_deliver));
2976 scanner.snapshot().check_invariants();
2977 } else {
2978 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2979 mutations_len -= 1;
2980 }
2981 }
2982 log::info!("Quiescing: {:#?}", events);
2983 smol::block_on(scanner.process_events(events));
2984 scanner.snapshot().check_invariants();
2985
2986 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2987 let mut new_scanner = BackgroundScanner::new(
2988 Arc::new(Mutex::new(initial_snapshot)),
2989 notify_tx,
2990 scanner.fs.clone(),
2991 scanner.executor.clone(),
2992 );
2993 smol::block_on(new_scanner.scan_dirs()).unwrap();
2994 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
2995 }
2996 }
2997
2998 fn randomly_mutate_tree(
2999 root_path: &Path,
3000 insertion_probability: f64,
3001 rng: &mut impl Rng,
3002 ) -> Result<Vec<fsevent::Event>> {
3003 let root_path = root_path.canonicalize().unwrap();
3004 let (dirs, files) = read_dir_recursive(root_path.clone());
3005
3006 let mut events = Vec::new();
3007 let mut record_event = |path: PathBuf| {
3008 events.push(fsevent::Event {
3009 event_id: SystemTime::now()
3010 .duration_since(UNIX_EPOCH)
3011 .unwrap()
3012 .as_secs(),
3013 flags: fsevent::StreamFlags::empty(),
3014 path,
3015 });
3016 };
3017
3018 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3019 let path = dirs.choose(rng).unwrap();
3020 let new_path = path.join(gen_name(rng));
3021
3022 if rng.gen() {
3023 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3024 std::fs::create_dir(&new_path)?;
3025 } else {
3026 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3027 std::fs::write(&new_path, "")?;
3028 }
3029 record_event(new_path);
3030 } else if rng.gen_bool(0.05) {
3031 let ignore_dir_path = dirs.choose(rng).unwrap();
3032 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3033
3034 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3035 let files_to_ignore = {
3036 let len = rng.gen_range(0..=subfiles.len());
3037 subfiles.choose_multiple(rng, len)
3038 };
3039 let dirs_to_ignore = {
3040 let len = rng.gen_range(0..subdirs.len());
3041 subdirs.choose_multiple(rng, len)
3042 };
3043
3044 let mut ignore_contents = String::new();
3045 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3046 write!(
3047 ignore_contents,
3048 "{}\n",
3049 path_to_ignore
3050 .strip_prefix(&ignore_dir_path)?
3051 .to_str()
3052 .unwrap()
3053 )
3054 .unwrap();
3055 }
3056 log::info!(
3057 "Creating {:?} with contents:\n{}",
3058 ignore_path.strip_prefix(&root_path)?,
3059 ignore_contents
3060 );
3061 std::fs::write(&ignore_path, ignore_contents).unwrap();
3062 record_event(ignore_path);
3063 } else {
3064 let old_path = {
3065 let file_path = files.choose(rng);
3066 let dir_path = dirs[1..].choose(rng);
3067 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3068 };
3069
3070 let is_rename = rng.gen();
3071 if is_rename {
3072 let new_path_parent = dirs
3073 .iter()
3074 .filter(|d| !d.starts_with(old_path))
3075 .choose(rng)
3076 .unwrap();
3077
3078 let overwrite_existing_dir =
3079 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3080 let new_path = if overwrite_existing_dir {
3081 std::fs::remove_dir_all(&new_path_parent).ok();
3082 new_path_parent.to_path_buf()
3083 } else {
3084 new_path_parent.join(gen_name(rng))
3085 };
3086
3087 log::info!(
3088 "Renaming {:?} to {}{:?}",
3089 old_path.strip_prefix(&root_path)?,
3090 if overwrite_existing_dir {
3091 "overwrite "
3092 } else {
3093 ""
3094 },
3095 new_path.strip_prefix(&root_path)?
3096 );
3097 std::fs::rename(&old_path, &new_path)?;
3098 record_event(old_path.clone());
3099 record_event(new_path);
3100 } else if old_path.is_dir() {
3101 let (dirs, files) = read_dir_recursive(old_path.clone());
3102
3103 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3104 std::fs::remove_dir_all(&old_path).unwrap();
3105 for file in files {
3106 record_event(file);
3107 }
3108 for dir in dirs {
3109 record_event(dir);
3110 }
3111 } else {
3112 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3113 std::fs::remove_file(old_path).unwrap();
3114 record_event(old_path.clone());
3115 }
3116 }
3117
3118 Ok(events)
3119 }
3120
3121 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3122 let child_entries = std::fs::read_dir(&path).unwrap();
3123 let mut dirs = vec![path];
3124 let mut files = Vec::new();
3125 for child_entry in child_entries {
3126 let child_path = child_entry.unwrap().path();
3127 if child_path.is_dir() {
3128 let (child_dirs, child_files) = read_dir_recursive(child_path);
3129 dirs.extend(child_dirs);
3130 files.extend(child_files);
3131 } else {
3132 files.push(child_path);
3133 }
3134 }
3135 (dirs, files)
3136 }
3137
3138 fn gen_name(rng: &mut impl Rng) -> String {
3139 (0..6)
3140 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3141 .map(char::from)
3142 .collect()
3143 }
3144
3145 impl Snapshot {
3146 fn check_invariants(&self) {
3147 let mut files = self.files(0);
3148 let mut visible_files = self.visible_files(0);
3149 for entry in self.entries_by_path.cursor::<(), ()>() {
3150 if entry.is_file() {
3151 assert_eq!(files.next().unwrap().inode(), entry.inode);
3152 if !entry.is_ignored {
3153 assert_eq!(visible_files.next().unwrap().inode(), entry.inode);
3154 }
3155 }
3156 }
3157 assert!(files.next().is_none());
3158 assert!(visible_files.next().is_none());
3159
3160 let mut bfs_paths = Vec::new();
3161 let mut stack = vec![Path::new("")];
3162 while let Some(path) = stack.pop() {
3163 bfs_paths.push(path);
3164 let ix = stack.len();
3165 for child_entry in self.child_entries(path) {
3166 stack.insert(ix, child_entry.path());
3167 }
3168 }
3169
3170 let dfs_paths = self
3171 .entries_by_path
3172 .cursor::<(), ()>()
3173 .map(|e| e.path().as_ref())
3174 .collect::<Vec<_>>();
3175 assert_eq!(bfs_paths, dfs_paths);
3176
3177 for (ignore_parent_path, _) in &self.ignores {
3178 assert!(self.entry_for_path(ignore_parent_path).is_some());
3179 assert!(self
3180 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3181 .is_some());
3182 }
3183 }
3184
3185 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3186 let mut paths = Vec::new();
3187 for entry in self.entries_by_path.cursor::<(), ()>() {
3188 paths.push((entry.path().as_ref(), entry.inode(), entry.is_ignored()));
3189 }
3190 paths.sort_by(|a, b| a.0.cmp(&b.0));
3191 paths
3192 }
3193 }
3194}