1mod char_bag;
2mod fuzzy;
3mod ignore;
4
5use self::{char_bag::CharBag, ignore::IgnoreStack};
6use crate::{
7 editor::{self, Buffer, History, Operation, Rope},
8 language::LanguageRegistry,
9 rpc::{self, proto},
10 sum_tree::{self, Cursor, Edit, SumTree},
11 time::{self, ReplicaId},
12 util::Bias,
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Context, Result};
16use atomic::Ordering::SeqCst;
17pub use fuzzy::{match_paths, PathMatch};
18use gpui::{
19 scoped_pool, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
20 Task, WeakModelHandle,
21};
22use lazy_static::lazy_static;
23use parking_lot::Mutex;
24use postage::{
25 prelude::{Sink, Stream},
26 watch,
27};
28use smol::{
29 channel::Sender,
30 io::{AsyncReadExt, AsyncWriteExt},
31};
32use std::{
33 cmp,
34 collections::HashMap,
35 convert::TryInto,
36 ffi::{OsStr, OsString},
37 fmt, fs,
38 future::Future,
39 io,
40 ops::Deref,
41 os::unix::fs::MetadataExt,
42 path::{Path, PathBuf},
43 sync::{
44 atomic::{self, AtomicUsize},
45 Arc,
46 },
47 time::{Duration, SystemTime},
48};
49use zed_rpc::{PeerId, TypedEnvelope};
50
51lazy_static! {
52 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
53}
54
55pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
56 rpc.on_message(remote::add_guest, cx);
57 rpc.on_message(remote::remove_guest, cx);
58 rpc.on_message(remote::open_buffer, cx);
59 rpc.on_message(remote::close_buffer, cx);
60 rpc.on_message(remote::update_buffer, cx);
61 rpc.on_message(remote::buffer_saved, cx);
62 rpc.on_message(remote::save_buffer, cx);
63}
64
65#[derive(Clone, Debug)]
66enum ScanState {
67 Idle,
68 Scanning,
69 Err(Arc<io::Error>),
70}
71
72pub enum Worktree {
73 Local(LocalWorktree),
74 Remote(RemoteWorktree),
75}
76
77impl Entity for Worktree {
78 type Event = ();
79
80 fn release(&mut self, cx: &mut MutableAppContext) {
81 let rpc = match self {
82 Self::Local(tree) => tree.rpc.clone(),
83 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
84 };
85
86 if let Some((rpc, worktree_id)) = rpc {
87 cx.spawn(|_| async move {
88 rpc.state
89 .write()
90 .await
91 .shared_worktrees
92 .remove(&worktree_id);
93 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
94 log::error!("error closing worktree {}: {}", worktree_id, err);
95 }
96 })
97 .detach();
98 }
99 }
100}
101
102impl Worktree {
103 pub fn local(
104 path: impl Into<Arc<Path>>,
105 languages: Arc<LanguageRegistry>,
106 cx: &mut ModelContext<Worktree>,
107 ) -> Self {
108 Worktree::Local(LocalWorktree::new(path, languages, cx))
109 }
110
111 pub async fn remote(
112 rpc: rpc::Client,
113 id: u64,
114 access_token: String,
115 languages: Arc<LanguageRegistry>,
116 cx: &mut AsyncAppContext,
117 ) -> Result<ModelHandle<Self>> {
118 let open_worktree_response = rpc
119 .request(proto::OpenWorktree {
120 worktree_id: id,
121 access_token,
122 })
123 .await?;
124 let worktree_message = open_worktree_response
125 .worktree
126 .ok_or_else(|| anyhow!("empty worktree"))?;
127 let replica_id = open_worktree_response.replica_id as ReplicaId;
128 let peers = open_worktree_response.peers;
129 let root_char_bag: CharBag = worktree_message
130 .root_name
131 .chars()
132 .map(|c| c.to_ascii_lowercase())
133 .collect();
134 let root_name = worktree_message.root_name.clone();
135 let (entries, paths_by_id) = cx
136 .background()
137 .spawn(async move {
138 let mut entries = SumTree::new();
139 let mut paths_by_id = rpds::HashTrieMapSync::default();
140 for entry in worktree_message.entries {
141 if let Some(mtime) = entry.mtime {
142 let kind = if entry.is_dir {
143 EntryKind::Dir
144 } else {
145 let mut char_bag = root_char_bag.clone();
146 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
147 EntryKind::File(char_bag)
148 };
149 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
150 entries.push(
151 Entry {
152 id: entry.id as usize,
153 kind,
154 path: path.clone(),
155 inode: entry.inode,
156 mtime: mtime.into(),
157 is_symlink: entry.is_symlink,
158 is_ignored: entry.is_ignored,
159 },
160 &(),
161 );
162 paths_by_id.insert_mut(entry.id as usize, path);
163 } else {
164 log::warn!("missing mtime in remote worktree entry {:?}", entry.path);
165 }
166 }
167 (entries, paths_by_id)
168 })
169 .await;
170 let worktree = cx.update(|cx| {
171 cx.add_model(|cx| {
172 Worktree::Remote(RemoteWorktree {
173 remote_id: id,
174 replica_id,
175 snapshot: Snapshot {
176 id: cx.model_id(),
177 scan_id: 0,
178 abs_path: Path::new("").into(),
179 root_name,
180 root_char_bag,
181 ignores: Default::default(),
182 entries,
183 paths_by_id,
184 removed_entry_ids: Default::default(),
185 next_entry_id: Default::default(),
186 },
187 rpc: rpc.clone(),
188 open_buffers: Default::default(),
189 peers: peers
190 .into_iter()
191 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
192 .collect(),
193 languages,
194 })
195 })
196 });
197 rpc.state
198 .write()
199 .await
200 .shared_worktrees
201 .insert(id, worktree.downgrade());
202 Ok(worktree)
203 }
204
205 pub fn as_local(&self) -> Option<&LocalWorktree> {
206 if let Worktree::Local(worktree) = self {
207 Some(worktree)
208 } else {
209 None
210 }
211 }
212
213 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
214 if let Worktree::Local(worktree) = self {
215 Some(worktree)
216 } else {
217 None
218 }
219 }
220
221 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
222 if let Worktree::Remote(worktree) = self {
223 Some(worktree)
224 } else {
225 None
226 }
227 }
228
229 pub fn snapshot(&self) -> Snapshot {
230 match self {
231 Worktree::Local(worktree) => worktree.snapshot(),
232 Worktree::Remote(worktree) => worktree.snapshot.clone(),
233 }
234 }
235
236 pub fn replica_id(&self) -> ReplicaId {
237 match self {
238 Worktree::Local(_) => 0,
239 Worktree::Remote(worktree) => worktree.replica_id,
240 }
241 }
242
243 pub fn add_guest(
244 &mut self,
245 envelope: TypedEnvelope<proto::AddGuest>,
246 cx: &mut ModelContext<Worktree>,
247 ) -> Result<()> {
248 match self {
249 Worktree::Local(worktree) => worktree.add_guest(envelope, cx),
250 Worktree::Remote(worktree) => worktree.add_guest(envelope, cx),
251 }
252 }
253
254 pub fn remove_guest(
255 &mut self,
256 envelope: TypedEnvelope<proto::RemoveGuest>,
257 cx: &mut ModelContext<Worktree>,
258 ) -> Result<()> {
259 match self {
260 Worktree::Local(worktree) => worktree.remove_guest(envelope, cx),
261 Worktree::Remote(worktree) => worktree.remove_guest(envelope, cx),
262 }
263 }
264
265 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
266 match self {
267 Worktree::Local(worktree) => &worktree.peers,
268 Worktree::Remote(worktree) => &worktree.peers,
269 }
270 }
271
272 pub fn open_buffer(
273 &mut self,
274 path: impl AsRef<Path>,
275 cx: &mut ModelContext<Self>,
276 ) -> Task<Result<ModelHandle<Buffer>>> {
277 match self {
278 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
279 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
280 }
281 }
282
283 #[cfg(feature = "test-support")]
284 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
285 let open_buffers = match self {
286 Worktree::Local(worktree) => &worktree.open_buffers,
287 Worktree::Remote(worktree) => &worktree.open_buffers,
288 };
289
290 let path = path.as_ref();
291 open_buffers
292 .values()
293 .find(|buffer| {
294 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
295 file.path.as_ref() == path
296 } else {
297 false
298 }
299 })
300 .is_some()
301 }
302
303 pub fn update_buffer(
304 &mut self,
305 envelope: proto::UpdateBuffer,
306 cx: &mut ModelContext<Self>,
307 ) -> Result<()> {
308 let open_buffers = match self {
309 Worktree::Local(worktree) => &worktree.open_buffers,
310 Worktree::Remote(worktree) => &worktree.open_buffers,
311 };
312 let buffer = open_buffers
313 .get(&(envelope.buffer_id as usize))
314 .and_then(|buf| buf.upgrade(&cx));
315
316 let buffer = if let Some(buffer) = buffer {
317 buffer
318 } else {
319 return if matches!(self, Worktree::Local(_)) {
320 Err(anyhow!(
321 "invalid buffer {} in update buffer message",
322 envelope.buffer_id
323 ))
324 } else {
325 Ok(())
326 };
327 };
328
329 let ops = envelope
330 .operations
331 .into_iter()
332 .map(|op| op.try_into())
333 .collect::<anyhow::Result<Vec<_>>>()?;
334 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
335 Ok(())
336 }
337
338 pub fn buffer_saved(
339 &mut self,
340 message: proto::BufferSaved,
341 cx: &mut ModelContext<Self>,
342 ) -> Result<()> {
343 if let Worktree::Remote(worktree) = self {
344 if let Some(buffer) = worktree
345 .open_buffers
346 .get(&(message.buffer_id as usize))
347 .and_then(|buf| buf.upgrade(&cx))
348 {
349 buffer.update(cx, |buffer, cx| {
350 buffer.did_save(message.version.try_into()?, cx)
351 })?;
352 }
353 Ok(())
354 } else {
355 Err(anyhow!(
356 "invalid buffer {} in buffer saved message",
357 message.buffer_id
358 ))
359 }
360 }
361}
362
363impl Deref for Worktree {
364 type Target = Snapshot;
365
366 fn deref(&self) -> &Self::Target {
367 match self {
368 Worktree::Local(worktree) => &worktree.snapshot,
369 Worktree::Remote(worktree) => &worktree.snapshot,
370 }
371 }
372}
373
374pub struct LocalWorktree {
375 snapshot: Snapshot,
376 background_snapshot: Arc<Mutex<Snapshot>>,
377 scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
378 _event_stream_handle: fsevent::Handle,
379 poll_scheduled: bool,
380 rpc: Option<(rpc::Client, u64)>,
381 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
382 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
383 peers: HashMap<PeerId, ReplicaId>,
384 languages: Arc<LanguageRegistry>,
385}
386
387impl LocalWorktree {
388 fn new(
389 path: impl Into<Arc<Path>>,
390 languages: Arc<LanguageRegistry>,
391 cx: &mut ModelContext<Worktree>,
392 ) -> Self {
393 let abs_path = path.into();
394 let (scan_state_tx, scan_state_rx) = smol::channel::unbounded();
395 let id = cx.model_id();
396 let snapshot = Snapshot {
397 id,
398 scan_id: 0,
399 abs_path,
400 root_name: Default::default(),
401 root_char_bag: Default::default(),
402 ignores: Default::default(),
403 entries: Default::default(),
404 paths_by_id: Default::default(),
405 removed_entry_ids: Default::default(),
406 next_entry_id: Default::default(),
407 };
408 let (event_stream, event_stream_handle) =
409 fsevent::EventStream::new(&[snapshot.abs_path.as_ref()], Duration::from_millis(100));
410
411 let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
412
413 let tree = Self {
414 snapshot,
415 background_snapshot: background_snapshot.clone(),
416 scan_state: watch::channel_with(ScanState::Scanning),
417 _event_stream_handle: event_stream_handle,
418 poll_scheduled: false,
419 open_buffers: Default::default(),
420 shared_buffers: Default::default(),
421 peers: Default::default(),
422 rpc: None,
423 languages,
424 };
425
426 std::thread::spawn(move || {
427 let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id);
428 scanner.run(event_stream)
429 });
430
431 cx.spawn(|this, mut cx| {
432 let this = this.downgrade();
433 async move {
434 while let Ok(scan_state) = scan_state_rx.recv().await {
435 let alive = cx.update(|cx| {
436 if let Some(handle) = this.upgrade(&cx) {
437 handle.update(cx, |this, cx| {
438 if let Worktree::Local(worktree) = this {
439 worktree.observe_scan_state(scan_state, cx)
440 } else {
441 unreachable!()
442 }
443 });
444 true
445 } else {
446 false
447 }
448 });
449
450 if !alive {
451 break;
452 }
453 }
454 }
455 })
456 .detach();
457
458 tree
459 }
460
461 pub fn open_buffer(
462 &mut self,
463 path: &Path,
464 cx: &mut ModelContext<Worktree>,
465 ) -> Task<Result<ModelHandle<Buffer>>> {
466 let handle = cx.handle();
467
468 // If there is already a buffer for the given path, then return it.
469 let mut existing_buffer = None;
470 self.open_buffers.retain(|_buffer_id, buffer| {
471 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
472 if let Some(file) = buffer.read(cx.as_ref()).file() {
473 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
474 existing_buffer = Some(buffer);
475 }
476 }
477 true
478 } else {
479 false
480 }
481 });
482
483 let languages = self.languages.clone();
484 let path = Arc::from(path);
485 cx.spawn(|this, mut cx| async move {
486 if let Some(existing_buffer) = existing_buffer {
487 Ok(existing_buffer)
488 } else {
489 let (file, contents) = this
490 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
491 .await?;
492 let language = languages.select_language(&path).cloned();
493 let buffer = cx.add_model(|cx| {
494 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
495 });
496 this.update(&mut cx, |this, _| {
497 let this = this
498 .as_local_mut()
499 .ok_or_else(|| anyhow!("must be a local worktree"))?;
500 this.open_buffers.insert(buffer.id(), buffer.downgrade());
501 Ok(buffer)
502 })
503 }
504 })
505 }
506
507 pub fn open_remote_buffer(
508 &mut self,
509 envelope: TypedEnvelope<proto::OpenBuffer>,
510 cx: &mut ModelContext<Worktree>,
511 ) -> Task<Result<proto::OpenBufferResponse>> {
512 let peer_id = envelope.original_sender_id();
513 let path = Path::new(&envelope.payload.path);
514
515 let buffer = self.open_buffer(path, cx);
516
517 cx.spawn(|this, mut cx| async move {
518 let buffer = buffer.await?;
519 this.update(&mut cx, |this, cx| {
520 this.as_local_mut()
521 .unwrap()
522 .shared_buffers
523 .entry(peer_id?)
524 .or_default()
525 .insert(buffer.id() as u64, buffer.clone());
526
527 Ok(proto::OpenBufferResponse {
528 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
529 })
530 })
531 })
532 }
533
534 pub fn close_remote_buffer(
535 &mut self,
536 envelope: TypedEnvelope<proto::CloseBuffer>,
537 _: &mut ModelContext<Worktree>,
538 ) -> Result<()> {
539 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
540 shared_buffers.remove(&envelope.payload.buffer_id);
541 }
542
543 Ok(())
544 }
545
546 pub fn add_guest(
547 &mut self,
548 envelope: TypedEnvelope<proto::AddGuest>,
549 cx: &mut ModelContext<Worktree>,
550 ) -> Result<()> {
551 let guest = envelope
552 .payload
553 .guest
554 .ok_or_else(|| anyhow!("empty peer"))?;
555 self.peers
556 .insert(PeerId(guest.peer_id), guest.replica_id as ReplicaId);
557 cx.notify();
558 Ok(())
559 }
560
561 pub fn remove_guest(
562 &mut self,
563 envelope: TypedEnvelope<proto::RemoveGuest>,
564 cx: &mut ModelContext<Worktree>,
565 ) -> Result<()> {
566 let peer_id = PeerId(envelope.payload.peer_id);
567 let replica_id = self
568 .peers
569 .remove(&peer_id)
570 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
571 self.shared_buffers.remove(&peer_id);
572 for (_, buffer) in &self.open_buffers {
573 if let Some(buffer) = buffer.upgrade(&cx) {
574 buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx));
575 }
576 }
577 cx.notify();
578 Ok(())
579 }
580
581 pub fn scan_complete(&self) -> impl Future<Output = ()> {
582 let mut scan_state_rx = self.scan_state.1.clone();
583 async move {
584 let mut scan_state = Some(scan_state_rx.borrow().clone());
585 while let Some(ScanState::Scanning) = scan_state {
586 scan_state = scan_state_rx.recv().await;
587 }
588 }
589 }
590
591 fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
592 self.scan_state.0.blocking_send(scan_state).ok();
593 self.poll_snapshot(cx);
594 }
595
596 fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
597 self.snapshot = self.background_snapshot.lock().clone();
598 if self.is_scanning() {
599 if !self.poll_scheduled {
600 cx.spawn(|this, mut cx| async move {
601 smol::Timer::after(Duration::from_millis(100)).await;
602 this.update(&mut cx, |this, cx| {
603 let worktree = this.as_local_mut().unwrap();
604 worktree.poll_scheduled = false;
605 worktree.poll_snapshot(cx);
606 })
607 })
608 .detach();
609 self.poll_scheduled = true;
610 }
611 } else {
612 let mut buffers_to_delete = Vec::new();
613 for (buffer_id, buffer) in &self.open_buffers {
614 if let Some(buffer) = buffer.upgrade(&cx) {
615 buffer.update(cx, |buffer, cx| {
616 let buffer_is_clean = !buffer.is_dirty();
617
618 if let Some(file) = buffer.file_mut() {
619 let mut file_changed = false;
620
621 if let Some(entry) = file
622 .entry_id
623 .and_then(|entry_id| self.entry_for_id(entry_id))
624 {
625 if entry.path != file.path {
626 file.path = entry.path.clone();
627 file_changed = true;
628 }
629
630 if entry.mtime != file.mtime {
631 file.mtime = entry.mtime;
632 file_changed = true;
633 if buffer_is_clean {
634 let abs_path = self.absolutize(&file.path);
635 refresh_buffer(abs_path, cx);
636 }
637 }
638 } else if let Some(entry) = self.entry_for_path(&file.path) {
639 file.entry_id = Some(entry.id);
640 file.mtime = entry.mtime;
641 if buffer_is_clean {
642 let abs_path = self.absolutize(&file.path);
643 refresh_buffer(abs_path, cx);
644 }
645 file_changed = true;
646 } else if !file.is_deleted() {
647 if buffer_is_clean {
648 cx.emit(editor::buffer::Event::Dirtied);
649 }
650 file.entry_id = None;
651 file_changed = true;
652 }
653
654 if file_changed {
655 cx.emit(editor::buffer::Event::FileHandleChanged);
656 }
657 }
658 });
659 } else {
660 buffers_to_delete.push(*buffer_id);
661 }
662 }
663
664 for buffer_id in buffers_to_delete {
665 self.open_buffers.remove(&buffer_id);
666 }
667 }
668
669 cx.notify();
670 }
671
672 fn is_scanning(&self) -> bool {
673 if let ScanState::Scanning = *self.scan_state.1.borrow() {
674 true
675 } else {
676 false
677 }
678 }
679
680 pub fn snapshot(&self) -> Snapshot {
681 self.snapshot.clone()
682 }
683
684 pub fn abs_path(&self) -> &Path {
685 self.snapshot.abs_path.as_ref()
686 }
687
688 pub fn contains_abs_path(&self, path: &Path) -> bool {
689 path.starts_with(&self.snapshot.abs_path)
690 }
691
692 fn absolutize(&self, path: &Path) -> PathBuf {
693 if path.file_name().is_some() {
694 self.snapshot.abs_path.join(path)
695 } else {
696 self.snapshot.abs_path.to_path_buf()
697 }
698 }
699
700 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
701 let handle = cx.handle();
702 let path = Arc::from(path);
703 let abs_path = self.absolutize(&path);
704 let background_snapshot = self.background_snapshot.clone();
705 cx.spawn(|this, mut cx| async move {
706 let mut file = smol::fs::File::open(&abs_path).await?;
707 let mut text = String::new();
708 file.read_to_string(&mut text).await?;
709 // Eagerly populate the snapshot with an updated entry for the loaded file
710 let entry = refresh_entry(&background_snapshot, path, &abs_path)?;
711 this.update(&mut cx, |this, cx| {
712 let this = this.as_local_mut().unwrap();
713 this.poll_snapshot(cx);
714 });
715 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
716 })
717 }
718
719 pub fn save_buffer_as(
720 &self,
721 buffer: ModelHandle<Buffer>,
722 path: impl Into<Arc<Path>>,
723 text: Rope,
724 cx: &mut ModelContext<Worktree>,
725 ) -> Task<Result<File>> {
726 let save = self.save(path, text, cx);
727 cx.spawn(|this, mut cx| async move {
728 let entry = save.await?;
729 this.update(&mut cx, |this, cx| {
730 this.as_local_mut()
731 .unwrap()
732 .open_buffers
733 .insert(buffer.id(), buffer.downgrade());
734 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
735 })
736 })
737 }
738
739 fn save(
740 &self,
741 path: impl Into<Arc<Path>>,
742 text: Rope,
743 cx: &mut ModelContext<Worktree>,
744 ) -> Task<Result<Entry>> {
745 let path = path.into();
746 let abs_path = self.absolutize(&path);
747 let background_snapshot = self.background_snapshot.clone();
748
749 let save = cx.background().spawn(async move {
750 let buffer_size = text.summary().bytes.min(10 * 1024);
751 let file = smol::fs::File::create(&abs_path).await?;
752 let mut writer = smol::io::BufWriter::with_capacity(buffer_size, file);
753 for chunk in text.chunks() {
754 writer.write_all(chunk.as_bytes()).await?;
755 }
756 writer.flush().await?;
757 refresh_entry(&background_snapshot, path.clone(), &abs_path)
758 });
759
760 cx.spawn(|this, mut cx| async move {
761 let entry = save.await?;
762 this.update(&mut cx, |this, cx| {
763 this.as_local_mut().unwrap().poll_snapshot(cx);
764 });
765 Ok(entry)
766 })
767 }
768
769 pub fn share(
770 &mut self,
771 rpc: rpc::Client,
772 cx: &mut ModelContext<Worktree>,
773 ) -> Task<anyhow::Result<(u64, String)>> {
774 let root_name = self.root_name.clone();
775 let snapshot = self.snapshot();
776 let handle = cx.handle();
777 cx.spawn(|this, mut cx| async move {
778 let entries = cx
779 .background()
780 .spawn(async move {
781 snapshot
782 .entries
783 .cursor::<(), ()>()
784 .map(|entry| proto::Entry {
785 id: entry.id as u64,
786 is_dir: entry.is_dir(),
787 path: entry.path.to_string_lossy().to_string(),
788 inode: entry.inode,
789 mtime: Some(entry.mtime.into()),
790 is_symlink: entry.is_symlink,
791 is_ignored: entry.is_ignored,
792 })
793 .collect()
794 })
795 .await;
796
797 let share_response = rpc
798 .request(proto::ShareWorktree {
799 worktree: Some(proto::Worktree { root_name, entries }),
800 })
801 .await?;
802
803 rpc.state
804 .write()
805 .await
806 .shared_worktrees
807 .insert(share_response.worktree_id, handle.downgrade());
808
809 log::info!("sharing worktree {:?}", share_response);
810
811 this.update(&mut cx, |worktree, _| {
812 worktree.as_local_mut().unwrap().rpc = Some((rpc, share_response.worktree_id));
813 });
814 Ok((share_response.worktree_id, share_response.access_token))
815 })
816 }
817}
818
819pub fn refresh_buffer(abs_path: PathBuf, cx: &mut ModelContext<Buffer>) {
820 cx.spawn(|buffer, mut cx| async move {
821 let new_text = cx
822 .background()
823 .spawn(async move {
824 let mut file = smol::fs::File::open(&abs_path).await?;
825 let mut text = String::new();
826 file.read_to_string(&mut text).await?;
827 Ok::<_, anyhow::Error>(text.into())
828 })
829 .await;
830
831 match new_text {
832 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
833 Ok(new_text) => {
834 buffer
835 .update(&mut cx, |buffer, cx| {
836 buffer.set_text_from_disk(new_text, cx)
837 })
838 .await;
839 }
840 }
841 })
842 .detach()
843}
844
845impl Deref for LocalWorktree {
846 type Target = Snapshot;
847
848 fn deref(&self) -> &Self::Target {
849 &self.snapshot
850 }
851}
852
853impl fmt::Debug for LocalWorktree {
854 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
855 self.snapshot.fmt(f)
856 }
857}
858
859pub struct RemoteWorktree {
860 remote_id: u64,
861 snapshot: Snapshot,
862 rpc: rpc::Client,
863 replica_id: ReplicaId,
864 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
865 peers: HashMap<PeerId, ReplicaId>,
866 languages: Arc<LanguageRegistry>,
867}
868
869impl RemoteWorktree {
870 pub fn open_buffer(
871 &mut self,
872 path: &Path,
873 cx: &mut ModelContext<Worktree>,
874 ) -> Task<Result<ModelHandle<Buffer>>> {
875 let handle = cx.handle();
876 let mut existing_buffer = None;
877 self.open_buffers.retain(|_buffer_id, buffer| {
878 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
879 if let Some(file) = buffer.read(cx.as_ref()).file() {
880 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
881 existing_buffer = Some(buffer);
882 }
883 }
884 true
885 } else {
886 false
887 }
888 });
889
890 let rpc = self.rpc.clone();
891 let languages = self.languages.clone();
892 let replica_id = self.replica_id;
893 let remote_worktree_id = self.remote_id;
894 let path = path.to_string_lossy().to_string();
895 cx.spawn(|this, mut cx| async move {
896 if let Some(existing_buffer) = existing_buffer {
897 Ok(existing_buffer)
898 } else {
899 let entry = this
900 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
901 .ok_or_else(|| anyhow!("file does not exist"))?;
902 let file = File::new(entry.id, handle, entry.path, entry.mtime);
903 let language = languages.select_language(&path).cloned();
904 let response = rpc
905 .request(proto::OpenBuffer {
906 worktree_id: remote_worktree_id as u64,
907 path,
908 })
909 .await?;
910 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
911 let buffer_id = remote_buffer.id;
912 let buffer = cx.add_model(|cx| {
913 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
914 });
915 this.update(&mut cx, |this, _| {
916 let this = this.as_remote_mut().unwrap();
917 this.open_buffers
918 .insert(buffer_id as usize, buffer.downgrade());
919 });
920 Ok(buffer)
921 }
922 })
923 }
924
925 pub fn add_guest(
926 &mut self,
927 envelope: TypedEnvelope<proto::AddGuest>,
928 cx: &mut ModelContext<Worktree>,
929 ) -> Result<()> {
930 let guest = envelope
931 .payload
932 .guest
933 .ok_or_else(|| anyhow!("empty peer"))?;
934 self.peers
935 .insert(PeerId(guest.peer_id), guest.replica_id as ReplicaId);
936 cx.notify();
937 Ok(())
938 }
939
940 pub fn remove_guest(
941 &mut self,
942 envelope: TypedEnvelope<proto::RemoveGuest>,
943 cx: &mut ModelContext<Worktree>,
944 ) -> Result<()> {
945 let peer_id = PeerId(envelope.payload.peer_id);
946 let replica_id = self
947 .peers
948 .remove(&peer_id)
949 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
950 for (_, buffer) in &self.open_buffers {
951 if let Some(buffer) = buffer.upgrade(&cx) {
952 buffer.update(cx, |buffer, cx| buffer.remove_guest(replica_id, cx));
953 }
954 }
955 cx.notify();
956 Ok(())
957 }
958}
959
960#[derive(Clone)]
961pub struct Snapshot {
962 id: usize,
963 scan_id: usize,
964 abs_path: Arc<Path>,
965 root_name: String,
966 root_char_bag: CharBag,
967 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
968 entries: SumTree<Entry>,
969 paths_by_id: rpds::HashTrieMapSync<usize, Arc<Path>>,
970 removed_entry_ids: HashMap<u64, usize>,
971 next_entry_id: Arc<AtomicUsize>,
972}
973
974impl Snapshot {
975 pub fn file_count(&self) -> usize {
976 self.entries.summary().file_count
977 }
978
979 pub fn visible_file_count(&self) -> usize {
980 self.entries.summary().visible_file_count
981 }
982
983 pub fn files(&self, start: usize) -> FileIter {
984 FileIter::all(self, start)
985 }
986
987 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
988 let empty_path = Path::new("");
989 self.entries
990 .cursor::<(), ()>()
991 .filter(move |entry| entry.path.as_ref() != empty_path)
992 .map(|entry| entry.path())
993 }
994
995 pub fn visible_files(&self, start: usize) -> FileIter {
996 FileIter::visible(self, start)
997 }
998
999 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1000 ChildEntriesIter::new(path, self)
1001 }
1002
1003 pub fn root_entry(&self) -> &Entry {
1004 self.entry_for_path("").unwrap()
1005 }
1006
1007 /// Returns the filename of the snapshot's root, plus a trailing slash if the snapshot's root is
1008 /// a directory.
1009 pub fn root_name(&self) -> &str {
1010 &self.root_name
1011 }
1012
1013 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1014 let mut cursor = self.entries.cursor::<_, ()>();
1015 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1016 cursor.item()
1017 } else {
1018 None
1019 }
1020 }
1021
1022 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1023 let path = self.paths_by_id.get(&id)?;
1024 self.entry_for_path(path)
1025 }
1026
1027 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1028 self.entry_for_path(path.as_ref()).map(|e| e.inode())
1029 }
1030
1031 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1032 if !entry.is_dir() && entry.path().file_name() == Some(&GITIGNORE) {
1033 let (ignore, err) = Gitignore::new(self.abs_path.join(entry.path()));
1034 if let Some(err) = err {
1035 log::error!("error in ignore file {:?} - {:?}", entry.path(), err);
1036 }
1037
1038 let ignore_dir_path = entry.path().parent().unwrap();
1039 self.ignores
1040 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1041 }
1042
1043 self.reuse_entry_id(&mut entry);
1044 self.entries.insert_or_replace(entry.clone(), &());
1045 self.paths_by_id.insert_mut(entry.id, entry.path.clone());
1046 entry
1047 }
1048
1049 fn populate_dir(
1050 &mut self,
1051 parent_path: Arc<Path>,
1052 entries: impl IntoIterator<Item = Entry>,
1053 ignore: Option<Arc<Gitignore>>,
1054 ) {
1055 let mut edits = Vec::new();
1056
1057 let mut parent_entry = self
1058 .entries
1059 .get(&PathKey(parent_path.clone()), &())
1060 .unwrap()
1061 .clone();
1062 if let Some(ignore) = ignore {
1063 self.ignores.insert(parent_path, (ignore, self.scan_id));
1064 }
1065 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1066 parent_entry.kind = EntryKind::Dir;
1067 } else {
1068 unreachable!();
1069 }
1070 edits.push(Edit::Insert(parent_entry));
1071
1072 for mut entry in entries {
1073 self.reuse_entry_id(&mut entry);
1074 self.paths_by_id.insert_mut(entry.id, entry.path.clone());
1075 edits.push(Edit::Insert(entry));
1076 }
1077 self.entries.edit(edits, &());
1078 }
1079
1080 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1081 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1082 entry.id = removed_entry_id;
1083 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1084 entry.id = existing_entry.id;
1085 }
1086 }
1087
1088 fn remove_path(&mut self, path: &Path) {
1089 let mut new_entries;
1090 let removed_entry_ids;
1091 {
1092 let mut cursor = self.entries.cursor::<_, ()>();
1093 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1094 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1095 new_entries.push_tree(cursor.suffix(&()), &());
1096 }
1097 self.entries = new_entries;
1098 for entry in removed_entry_ids.cursor::<(), ()>() {
1099 let removed_entry_id = self
1100 .removed_entry_ids
1101 .entry(entry.inode)
1102 .or_insert(entry.id);
1103 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1104 self.paths_by_id.remove_mut(&entry.id);
1105 }
1106
1107 if path.file_name() == Some(&GITIGNORE) {
1108 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1109 *scan_id = self.scan_id;
1110 }
1111 }
1112 }
1113
1114 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1115 let mut new_ignores = Vec::new();
1116 for ancestor in path.ancestors().skip(1) {
1117 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1118 new_ignores.push((ancestor, Some(ignore.clone())));
1119 } else {
1120 new_ignores.push((ancestor, None));
1121 }
1122 }
1123
1124 let mut ignore_stack = IgnoreStack::none();
1125 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1126 if ignore_stack.is_path_ignored(&parent_path, true) {
1127 ignore_stack = IgnoreStack::all();
1128 break;
1129 } else if let Some(ignore) = ignore {
1130 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1131 }
1132 }
1133
1134 if ignore_stack.is_path_ignored(path, is_dir) {
1135 ignore_stack = IgnoreStack::all();
1136 }
1137
1138 ignore_stack
1139 }
1140}
1141
1142impl fmt::Debug for Snapshot {
1143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1144 for entry in self.entries.cursor::<(), ()>() {
1145 for _ in entry.path().ancestors().skip(1) {
1146 write!(f, " ")?;
1147 }
1148 writeln!(f, "{:?} (inode: {})", entry.path(), entry.inode())?;
1149 }
1150 Ok(())
1151 }
1152}
1153
1154#[derive(Clone, PartialEq)]
1155pub struct File {
1156 entry_id: Option<usize>,
1157 worktree: ModelHandle<Worktree>,
1158 pub path: Arc<Path>,
1159 pub mtime: SystemTime,
1160}
1161
1162impl File {
1163 pub fn new(
1164 entry_id: usize,
1165 worktree: ModelHandle<Worktree>,
1166 path: Arc<Path>,
1167 mtime: SystemTime,
1168 ) -> Self {
1169 Self {
1170 entry_id: Some(entry_id),
1171 worktree,
1172 path,
1173 mtime,
1174 }
1175 }
1176
1177 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1178 self.worktree.update(cx, |worktree, cx| {
1179 if let Some((rpc, remote_id)) = match worktree {
1180 Worktree::Local(worktree) => worktree.rpc.clone(),
1181 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1182 } {
1183 cx.background()
1184 .spawn(async move {
1185 if let Err(error) = rpc
1186 .send(proto::UpdateBuffer {
1187 worktree_id: remote_id,
1188 buffer_id,
1189 operations: Some(operation).iter().map(Into::into).collect(),
1190 })
1191 .await
1192 {
1193 log::error!("error sending buffer operation: {}", error);
1194 }
1195 })
1196 .detach();
1197 }
1198 });
1199 }
1200
1201 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1202 self.worktree.update(cx, |worktree, cx| {
1203 if let Worktree::Remote(worktree) = worktree {
1204 let worktree_id = worktree.remote_id;
1205 let rpc = worktree.rpc.clone();
1206 cx.background()
1207 .spawn(async move {
1208 if let Err(error) = rpc
1209 .send(proto::CloseBuffer {
1210 worktree_id,
1211 buffer_id,
1212 })
1213 .await
1214 {
1215 log::error!("error closing remote buffer: {}", error);
1216 };
1217 })
1218 .detach();
1219 }
1220 });
1221 }
1222
1223 /// Returns this file's path relative to the root of its worktree.
1224 pub fn path(&self) -> Arc<Path> {
1225 self.path.clone()
1226 }
1227
1228 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1229 self.worktree.read(cx).abs_path.join(&self.path)
1230 }
1231
1232 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1233 /// of its worktree, then this method will return the name of the worktree itself.
1234 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1235 self.path
1236 .file_name()
1237 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1238 .map(Into::into)
1239 }
1240
1241 pub fn is_deleted(&self) -> bool {
1242 self.entry_id.is_none()
1243 }
1244
1245 pub fn exists(&self) -> bool {
1246 !self.is_deleted()
1247 }
1248
1249 pub fn save(
1250 &self,
1251 buffer_id: u64,
1252 text: Rope,
1253 version: time::Global,
1254 cx: &mut MutableAppContext,
1255 ) -> Task<Result<time::Global>> {
1256 self.worktree.update(cx, |worktree, cx| match worktree {
1257 Worktree::Local(worktree) => {
1258 let rpc = worktree.rpc.clone();
1259 let save = worktree.save(self.path.clone(), text, cx);
1260 cx.spawn(|_, _| async move {
1261 save.await?;
1262 if let Some((rpc, worktree_id)) = rpc {
1263 rpc.send(proto::BufferSaved {
1264 worktree_id,
1265 buffer_id,
1266 version: (&version).into(),
1267 })
1268 .await?;
1269 }
1270 Ok(version)
1271 })
1272 }
1273 Worktree::Remote(worktree) => {
1274 let rpc = worktree.rpc.clone();
1275 let worktree_id = worktree.remote_id;
1276 cx.spawn(|_, _| async move {
1277 let response = rpc
1278 .request(proto::SaveBuffer {
1279 worktree_id,
1280 buffer_id,
1281 })
1282 .await?;
1283 Ok(response.version.try_into()?)
1284 })
1285 }
1286 })
1287 }
1288
1289 pub fn worktree_id(&self) -> usize {
1290 self.worktree.id()
1291 }
1292
1293 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1294 (self.worktree.id(), self.path())
1295 }
1296}
1297
1298#[derive(Clone, Debug)]
1299pub struct Entry {
1300 id: usize,
1301 kind: EntryKind,
1302 path: Arc<Path>,
1303 inode: u64,
1304 mtime: SystemTime,
1305 is_symlink: bool,
1306 is_ignored: bool,
1307}
1308
1309#[derive(Clone, Debug)]
1310pub enum EntryKind {
1311 PendingDir,
1312 Dir,
1313 File(CharBag),
1314}
1315
1316impl Entry {
1317 pub fn path(&self) -> &Arc<Path> {
1318 &self.path
1319 }
1320
1321 pub fn inode(&self) -> u64 {
1322 self.inode
1323 }
1324
1325 pub fn is_ignored(&self) -> bool {
1326 self.is_ignored
1327 }
1328
1329 fn is_dir(&self) -> bool {
1330 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1331 }
1332
1333 fn is_file(&self) -> bool {
1334 matches!(self.kind, EntryKind::File(_))
1335 }
1336}
1337
1338impl sum_tree::Item for Entry {
1339 type Summary = EntrySummary;
1340
1341 fn summary(&self) -> Self::Summary {
1342 let file_count;
1343 let visible_file_count;
1344 if self.is_file() {
1345 file_count = 1;
1346 if self.is_ignored {
1347 visible_file_count = 0;
1348 } else {
1349 visible_file_count = 1;
1350 }
1351 } else {
1352 file_count = 0;
1353 visible_file_count = 0;
1354 }
1355
1356 EntrySummary {
1357 max_path: self.path().clone(),
1358 file_count,
1359 visible_file_count,
1360 }
1361 }
1362}
1363
1364impl sum_tree::KeyedItem for Entry {
1365 type Key = PathKey;
1366
1367 fn key(&self) -> Self::Key {
1368 PathKey(self.path().clone())
1369 }
1370}
1371
1372#[derive(Clone, Debug)]
1373pub struct EntrySummary {
1374 max_path: Arc<Path>,
1375 file_count: usize,
1376 visible_file_count: usize,
1377}
1378
1379impl Default for EntrySummary {
1380 fn default() -> Self {
1381 Self {
1382 max_path: Arc::from(Path::new("")),
1383 file_count: 0,
1384 visible_file_count: 0,
1385 }
1386 }
1387}
1388
1389impl sum_tree::Summary for EntrySummary {
1390 type Context = ();
1391
1392 fn add_summary(&mut self, rhs: &Self, _: &()) {
1393 self.max_path = rhs.max_path.clone();
1394 self.file_count += rhs.file_count;
1395 self.visible_file_count += rhs.visible_file_count;
1396 }
1397}
1398
1399#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1400pub struct PathKey(Arc<Path>);
1401
1402impl Default for PathKey {
1403 fn default() -> Self {
1404 Self(Path::new("").into())
1405 }
1406}
1407
1408impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1409 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1410 self.0 = summary.max_path.clone();
1411 }
1412}
1413
1414#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1415enum PathSearch<'a> {
1416 Exact(&'a Path),
1417 Successor(&'a Path),
1418}
1419
1420impl<'a> Ord for PathSearch<'a> {
1421 fn cmp(&self, other: &Self) -> cmp::Ordering {
1422 match (self, other) {
1423 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1424 (Self::Successor(a), Self::Exact(b)) => {
1425 if b.starts_with(a) {
1426 cmp::Ordering::Greater
1427 } else {
1428 a.cmp(b)
1429 }
1430 }
1431 _ => unreachable!("not sure we need the other two cases"),
1432 }
1433 }
1434}
1435
1436impl<'a> PartialOrd for PathSearch<'a> {
1437 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1438 Some(self.cmp(other))
1439 }
1440}
1441
1442impl<'a> Default for PathSearch<'a> {
1443 fn default() -> Self {
1444 Self::Exact(Path::new("").into())
1445 }
1446}
1447
1448impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1449 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1450 *self = Self::Exact(summary.max_path.as_ref());
1451 }
1452}
1453
1454#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1455pub struct FileCount(usize);
1456
1457impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1458 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1459 self.0 += summary.file_count;
1460 }
1461}
1462
1463#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1464pub struct VisibleFileCount(usize);
1465
1466impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1467 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1468 self.0 += summary.visible_file_count;
1469 }
1470}
1471
1472struct BackgroundScanner {
1473 snapshot: Arc<Mutex<Snapshot>>,
1474 notify: Sender<ScanState>,
1475 thread_pool: scoped_pool::Pool,
1476}
1477
1478impl BackgroundScanner {
1479 fn new(snapshot: Arc<Mutex<Snapshot>>, notify: Sender<ScanState>, worktree_id: usize) -> Self {
1480 Self {
1481 snapshot,
1482 notify,
1483 thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)),
1484 }
1485 }
1486
1487 fn abs_path(&self) -> Arc<Path> {
1488 self.snapshot.lock().abs_path.clone()
1489 }
1490
1491 fn snapshot(&self) -> Snapshot {
1492 self.snapshot.lock().clone()
1493 }
1494
1495 fn run(mut self, event_stream: fsevent::EventStream) {
1496 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
1497 return;
1498 }
1499
1500 if let Err(err) = self.scan_dirs() {
1501 if smol::block_on(self.notify.send(ScanState::Err(Arc::new(err)))).is_err() {
1502 return;
1503 }
1504 }
1505
1506 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
1507 return;
1508 }
1509
1510 event_stream.run(move |events| {
1511 if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
1512 return false;
1513 }
1514
1515 if !self.process_events(events) {
1516 return false;
1517 }
1518
1519 if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
1520 return false;
1521 }
1522
1523 true
1524 });
1525 }
1526
1527 fn scan_dirs(&mut self) -> io::Result<()> {
1528 self.snapshot.lock().scan_id += 1;
1529
1530 let path: Arc<Path> = Arc::from(Path::new(""));
1531 let abs_path = self.abs_path();
1532 let metadata = fs::metadata(&abs_path)?;
1533 let inode = metadata.ino();
1534 let is_symlink = fs::symlink_metadata(&abs_path)?.file_type().is_symlink();
1535 let is_dir = metadata.file_type().is_dir();
1536 let mtime = metadata.modified()?;
1537
1538 // After determining whether the root entry is a file or a directory, populate the
1539 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
1540 let mut root_name = abs_path
1541 .file_name()
1542 .map_or(String::new(), |f| f.to_string_lossy().to_string());
1543 if is_dir {
1544 root_name.push('/');
1545 }
1546
1547 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
1548 let next_entry_id;
1549 {
1550 let mut snapshot = self.snapshot.lock();
1551 snapshot.root_name = root_name;
1552 snapshot.root_char_bag = root_char_bag;
1553 next_entry_id = snapshot.next_entry_id.clone();
1554 }
1555
1556 if is_dir {
1557 self.snapshot.lock().insert_entry(Entry {
1558 id: next_entry_id.fetch_add(1, SeqCst),
1559 kind: EntryKind::PendingDir,
1560 path: path.clone(),
1561 inode,
1562 mtime,
1563 is_symlink,
1564 is_ignored: false,
1565 });
1566
1567 let (tx, rx) = crossbeam_channel::unbounded();
1568 tx.send(ScanJob {
1569 abs_path: abs_path.to_path_buf(),
1570 path,
1571 ignore_stack: IgnoreStack::none(),
1572 scan_queue: tx.clone(),
1573 })
1574 .unwrap();
1575 drop(tx);
1576
1577 self.thread_pool.scoped(|pool| {
1578 for _ in 0..self.thread_pool.thread_count() {
1579 pool.execute(|| {
1580 while let Ok(job) = rx.recv() {
1581 if let Err(err) =
1582 self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
1583 {
1584 log::error!("error scanning {:?}: {}", job.abs_path, err);
1585 }
1586 }
1587 });
1588 }
1589 });
1590 } else {
1591 self.snapshot.lock().insert_entry(Entry {
1592 id: next_entry_id.fetch_add(1, SeqCst),
1593 kind: EntryKind::File(char_bag_for_path(root_char_bag, &path)),
1594 path,
1595 inode,
1596 mtime,
1597 is_symlink,
1598 is_ignored: false,
1599 });
1600 }
1601
1602 Ok(())
1603 }
1604
1605 fn scan_dir(
1606 &self,
1607 root_char_bag: CharBag,
1608 next_entry_id: Arc<AtomicUsize>,
1609 job: &ScanJob,
1610 ) -> io::Result<()> {
1611 let mut new_entries: Vec<Entry> = Vec::new();
1612 let mut new_jobs: Vec<ScanJob> = Vec::new();
1613 let mut ignore_stack = job.ignore_stack.clone();
1614 let mut new_ignore = None;
1615
1616 for child_entry in fs::read_dir(&job.abs_path)? {
1617 let child_entry = child_entry?;
1618 let child_name = child_entry.file_name();
1619 let child_abs_path = job.abs_path.join(&child_name);
1620 let child_path: Arc<Path> = job.path.join(&child_name).into();
1621 let child_is_symlink = child_entry.metadata()?.file_type().is_symlink();
1622 let child_metadata = if let Ok(metadata) = fs::metadata(&child_abs_path) {
1623 metadata
1624 } else {
1625 log::error!("could not get metadata for path {:?}", child_abs_path);
1626 continue;
1627 };
1628
1629 let child_inode = child_metadata.ino();
1630 let child_mtime = child_metadata.modified()?;
1631
1632 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
1633 if child_name == *GITIGNORE {
1634 let (ignore, err) = Gitignore::new(&child_abs_path);
1635 if let Some(err) = err {
1636 log::error!("error in ignore file {:?} - {:?}", child_path, err);
1637 }
1638 let ignore = Arc::new(ignore);
1639 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
1640 new_ignore = Some(ignore);
1641
1642 // Update ignore status of any child entries we've already processed to reflect the
1643 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
1644 // there should rarely be too numerous. Update the ignore stack associated with any
1645 // new jobs as well.
1646 let mut new_jobs = new_jobs.iter_mut();
1647 for entry in &mut new_entries {
1648 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
1649 if entry.is_dir() {
1650 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
1651 IgnoreStack::all()
1652 } else {
1653 ignore_stack.clone()
1654 };
1655 }
1656 }
1657 }
1658
1659 if child_metadata.is_dir() {
1660 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
1661 new_entries.push(Entry {
1662 id: next_entry_id.fetch_add(1, SeqCst),
1663 kind: EntryKind::PendingDir,
1664 path: child_path.clone(),
1665 inode: child_inode,
1666 mtime: child_mtime,
1667 is_symlink: child_is_symlink,
1668 is_ignored,
1669 });
1670 new_jobs.push(ScanJob {
1671 abs_path: child_abs_path,
1672 path: child_path,
1673 ignore_stack: if is_ignored {
1674 IgnoreStack::all()
1675 } else {
1676 ignore_stack.clone()
1677 },
1678 scan_queue: job.scan_queue.clone(),
1679 });
1680 } else {
1681 let is_ignored = ignore_stack.is_path_ignored(&child_path, false);
1682 new_entries.push(Entry {
1683 id: next_entry_id.fetch_add(1, SeqCst),
1684 kind: EntryKind::File(char_bag_for_path(root_char_bag, &child_path)),
1685 path: child_path,
1686 inode: child_inode,
1687 mtime: child_mtime,
1688 is_symlink: child_is_symlink,
1689 is_ignored,
1690 });
1691 };
1692 }
1693
1694 self.snapshot
1695 .lock()
1696 .populate_dir(job.path.clone(), new_entries, new_ignore);
1697 for new_job in new_jobs {
1698 job.scan_queue.send(new_job).unwrap();
1699 }
1700
1701 Ok(())
1702 }
1703
1704 fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
1705 let mut snapshot = self.snapshot();
1706 snapshot.scan_id += 1;
1707
1708 let root_abs_path = if let Ok(abs_path) = snapshot.abs_path.canonicalize() {
1709 abs_path
1710 } else {
1711 return false;
1712 };
1713 let root_char_bag = snapshot.root_char_bag;
1714 let next_entry_id = snapshot.next_entry_id.clone();
1715
1716 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
1717 events.dedup_by(|a, b| a.path.starts_with(&b.path));
1718
1719 for event in &events {
1720 match event.path.strip_prefix(&root_abs_path) {
1721 Ok(path) => snapshot.remove_path(&path),
1722 Err(_) => {
1723 log::error!(
1724 "unexpected event {:?} for root path {:?}",
1725 event.path,
1726 root_abs_path
1727 );
1728 continue;
1729 }
1730 }
1731 }
1732
1733 let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
1734 for event in events {
1735 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
1736 Ok(path) => Arc::from(path.to_path_buf()),
1737 Err(_) => {
1738 log::error!(
1739 "unexpected event {:?} for root path {:?}",
1740 event.path,
1741 root_abs_path
1742 );
1743 continue;
1744 }
1745 };
1746
1747 match fs_entry_for_path(
1748 snapshot.root_char_bag,
1749 &next_entry_id,
1750 path.clone(),
1751 &event.path,
1752 ) {
1753 Ok(Some(mut fs_entry)) => {
1754 let is_dir = fs_entry.is_dir();
1755 let ignore_stack = snapshot.ignore_stack_for_path(&path, is_dir);
1756 fs_entry.is_ignored = ignore_stack.is_all();
1757 snapshot.insert_entry(fs_entry);
1758 if is_dir {
1759 scan_queue_tx
1760 .send(ScanJob {
1761 abs_path: event.path,
1762 path,
1763 ignore_stack,
1764 scan_queue: scan_queue_tx.clone(),
1765 })
1766 .unwrap();
1767 }
1768 }
1769 Ok(None) => {}
1770 Err(err) => {
1771 // TODO - create a special 'error' entry in the entries tree to mark this
1772 log::error!("error reading file on event {:?}", err);
1773 }
1774 }
1775 }
1776
1777 *self.snapshot.lock() = snapshot;
1778
1779 // Scan any directories that were created as part of this event batch.
1780 drop(scan_queue_tx);
1781 self.thread_pool.scoped(|pool| {
1782 for _ in 0..self.thread_pool.thread_count() {
1783 pool.execute(|| {
1784 while let Ok(job) = scan_queue_rx.recv() {
1785 if let Err(err) = self.scan_dir(root_char_bag, next_entry_id.clone(), &job)
1786 {
1787 log::error!("error scanning {:?}: {}", job.abs_path, err);
1788 }
1789 }
1790 });
1791 }
1792 });
1793
1794 // Attempt to detect renames only over a single batch of file-system events.
1795 self.snapshot.lock().removed_entry_ids.clear();
1796
1797 self.update_ignore_statuses();
1798 true
1799 }
1800
1801 fn update_ignore_statuses(&self) {
1802 let mut snapshot = self.snapshot();
1803
1804 let mut ignores_to_update = Vec::new();
1805 let mut ignores_to_delete = Vec::new();
1806 for (parent_path, (_, scan_id)) in &snapshot.ignores {
1807 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
1808 ignores_to_update.push(parent_path.clone());
1809 }
1810
1811 let ignore_path = parent_path.join(&*GITIGNORE);
1812 if snapshot.entry_for_path(ignore_path).is_none() {
1813 ignores_to_delete.push(parent_path.clone());
1814 }
1815 }
1816
1817 for parent_path in ignores_to_delete {
1818 snapshot.ignores.remove(&parent_path);
1819 self.snapshot.lock().ignores.remove(&parent_path);
1820 }
1821
1822 let (ignore_queue_tx, ignore_queue_rx) = crossbeam_channel::unbounded();
1823 ignores_to_update.sort_unstable();
1824 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
1825 while let Some(parent_path) = ignores_to_update.next() {
1826 while ignores_to_update
1827 .peek()
1828 .map_or(false, |p| p.starts_with(&parent_path))
1829 {
1830 ignores_to_update.next().unwrap();
1831 }
1832
1833 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
1834 ignore_queue_tx
1835 .send(UpdateIgnoreStatusJob {
1836 path: parent_path,
1837 ignore_stack,
1838 ignore_queue: ignore_queue_tx.clone(),
1839 })
1840 .unwrap();
1841 }
1842 drop(ignore_queue_tx);
1843
1844 self.thread_pool.scoped(|scope| {
1845 for _ in 0..self.thread_pool.thread_count() {
1846 scope.execute(|| {
1847 while let Ok(job) = ignore_queue_rx.recv() {
1848 self.update_ignore_status(job, &snapshot);
1849 }
1850 });
1851 }
1852 });
1853 }
1854
1855 fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
1856 let mut ignore_stack = job.ignore_stack;
1857 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
1858 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
1859 }
1860
1861 let mut edits = Vec::new();
1862 for mut entry in snapshot.child_entries(&job.path).cloned() {
1863 let was_ignored = entry.is_ignored;
1864 entry.is_ignored = ignore_stack.is_path_ignored(entry.path(), entry.is_dir());
1865 if entry.is_dir() {
1866 let child_ignore_stack = if entry.is_ignored {
1867 IgnoreStack::all()
1868 } else {
1869 ignore_stack.clone()
1870 };
1871 job.ignore_queue
1872 .send(UpdateIgnoreStatusJob {
1873 path: entry.path().clone(),
1874 ignore_stack: child_ignore_stack,
1875 ignore_queue: job.ignore_queue.clone(),
1876 })
1877 .unwrap();
1878 }
1879
1880 if entry.is_ignored != was_ignored {
1881 edits.push(Edit::Insert(entry));
1882 }
1883 }
1884 self.snapshot.lock().entries.edit(edits, &());
1885 }
1886}
1887
1888fn refresh_entry(snapshot: &Mutex<Snapshot>, path: Arc<Path>, abs_path: &Path) -> Result<Entry> {
1889 let root_char_bag;
1890 let next_entry_id;
1891 {
1892 let snapshot = snapshot.lock();
1893 root_char_bag = snapshot.root_char_bag;
1894 next_entry_id = snapshot.next_entry_id.clone();
1895 }
1896 let entry = fs_entry_for_path(root_char_bag, &next_entry_id, path, abs_path)?
1897 .ok_or_else(|| anyhow!("could not read saved file metadata"))?;
1898 Ok(snapshot.lock().insert_entry(entry))
1899}
1900
1901fn fs_entry_for_path(
1902 root_char_bag: CharBag,
1903 next_entry_id: &AtomicUsize,
1904 path: Arc<Path>,
1905 abs_path: &Path,
1906) -> Result<Option<Entry>> {
1907 let metadata = match fs::metadata(&abs_path) {
1908 Err(err) => {
1909 return match (err.kind(), err.raw_os_error()) {
1910 (io::ErrorKind::NotFound, _) => Ok(None),
1911 (io::ErrorKind::Other, Some(libc::ENOTDIR)) => Ok(None),
1912 _ => Err(anyhow::Error::new(err)),
1913 }
1914 }
1915 Ok(metadata) => metadata,
1916 };
1917 let inode = metadata.ino();
1918 let mtime = metadata.modified()?;
1919 let is_symlink = fs::symlink_metadata(&abs_path)
1920 .context("failed to read symlink metadata")?
1921 .file_type()
1922 .is_symlink();
1923
1924 let entry = Entry {
1925 id: next_entry_id.fetch_add(1, SeqCst),
1926 kind: if metadata.file_type().is_dir() {
1927 EntryKind::PendingDir
1928 } else {
1929 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1930 },
1931 path: Arc::from(path),
1932 inode,
1933 mtime,
1934 is_symlink,
1935 is_ignored: false,
1936 };
1937
1938 Ok(Some(entry))
1939}
1940
1941fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
1942 let mut result = root_char_bag;
1943 result.extend(
1944 path.to_string_lossy()
1945 .chars()
1946 .map(|c| c.to_ascii_lowercase()),
1947 );
1948 result
1949}
1950
1951struct ScanJob {
1952 abs_path: PathBuf,
1953 path: Arc<Path>,
1954 ignore_stack: Arc<IgnoreStack>,
1955 scan_queue: crossbeam_channel::Sender<ScanJob>,
1956}
1957
1958struct UpdateIgnoreStatusJob {
1959 path: Arc<Path>,
1960 ignore_stack: Arc<IgnoreStack>,
1961 ignore_queue: crossbeam_channel::Sender<UpdateIgnoreStatusJob>,
1962}
1963
1964pub trait WorktreeHandle {
1965 #[cfg(test)]
1966 fn flush_fs_events<'a>(
1967 &self,
1968 cx: &'a gpui::TestAppContext,
1969 ) -> futures::future::LocalBoxFuture<'a, ()>;
1970}
1971
1972impl WorktreeHandle for ModelHandle<Worktree> {
1973 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
1974 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
1975 // extra directory scans, and emit extra scan-state notifications.
1976 //
1977 // This function mutates the worktree's directory and waits for those mutations to be picked up,
1978 // to ensure that all redundant FS events have already been processed.
1979 #[cfg(test)]
1980 fn flush_fs_events<'a>(
1981 &self,
1982 cx: &'a gpui::TestAppContext,
1983 ) -> futures::future::LocalBoxFuture<'a, ()> {
1984 use smol::future::FutureExt;
1985
1986 let filename = "fs-event-sentinel";
1987 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
1988 let tree = self.clone();
1989 async move {
1990 fs::write(root_path.join(filename), "").unwrap();
1991 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
1992 .await;
1993
1994 fs::remove_file(root_path.join(filename)).unwrap();
1995 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
1996 .await;
1997
1998 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
1999 .await;
2000 }
2001 .boxed_local()
2002 }
2003}
2004
2005pub enum FileIter<'a> {
2006 All(Cursor<'a, Entry, FileCount, ()>),
2007 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2008}
2009
2010impl<'a> FileIter<'a> {
2011 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2012 let mut cursor = snapshot.entries.cursor();
2013 cursor.seek(&FileCount(start), Bias::Right, &());
2014 Self::All(cursor)
2015 }
2016
2017 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2018 let mut cursor = snapshot.entries.cursor();
2019 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2020 Self::Visible(cursor)
2021 }
2022
2023 fn next_internal(&mut self) {
2024 match self {
2025 Self::All(cursor) => {
2026 let ix = *cursor.seek_start();
2027 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2028 }
2029 Self::Visible(cursor) => {
2030 let ix = *cursor.seek_start();
2031 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2032 }
2033 }
2034 }
2035
2036 fn item(&self) -> Option<&'a Entry> {
2037 match self {
2038 Self::All(cursor) => cursor.item(),
2039 Self::Visible(cursor) => cursor.item(),
2040 }
2041 }
2042}
2043
2044impl<'a> Iterator for FileIter<'a> {
2045 type Item = &'a Entry;
2046
2047 fn next(&mut self) -> Option<Self::Item> {
2048 if let Some(entry) = self.item() {
2049 self.next_internal();
2050 Some(entry)
2051 } else {
2052 None
2053 }
2054 }
2055}
2056
2057struct ChildEntriesIter<'a> {
2058 parent_path: &'a Path,
2059 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2060}
2061
2062impl<'a> ChildEntriesIter<'a> {
2063 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2064 let mut cursor = snapshot.entries.cursor();
2065 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2066 Self {
2067 parent_path,
2068 cursor,
2069 }
2070 }
2071}
2072
2073impl<'a> Iterator for ChildEntriesIter<'a> {
2074 type Item = &'a Entry;
2075
2076 fn next(&mut self) -> Option<Self::Item> {
2077 if let Some(item) = self.cursor.item() {
2078 if item.path().starts_with(self.parent_path) {
2079 self.cursor
2080 .seek_forward(&PathSearch::Successor(item.path()), Bias::Left, &());
2081 Some(item)
2082 } else {
2083 None
2084 }
2085 } else {
2086 None
2087 }
2088 }
2089}
2090
2091mod remote {
2092 use super::*;
2093
2094 pub async fn add_guest(
2095 envelope: TypedEnvelope<proto::AddGuest>,
2096 rpc: &rpc::Client,
2097 cx: &mut AsyncAppContext,
2098 ) -> anyhow::Result<()> {
2099 rpc.state
2100 .read()
2101 .await
2102 .shared_worktree(envelope.payload.worktree_id, cx)?
2103 .update(cx, |worktree, cx| worktree.add_guest(envelope, cx))
2104 }
2105
2106 pub async fn remove_guest(
2107 envelope: TypedEnvelope<proto::RemoveGuest>,
2108 rpc: &rpc::Client,
2109 cx: &mut AsyncAppContext,
2110 ) -> anyhow::Result<()> {
2111 rpc.state
2112 .read()
2113 .await
2114 .shared_worktree(envelope.payload.worktree_id, cx)?
2115 .update(cx, |worktree, cx| worktree.remove_guest(envelope, cx))
2116 }
2117
2118 pub async fn open_buffer(
2119 envelope: TypedEnvelope<proto::OpenBuffer>,
2120 rpc: &rpc::Client,
2121 cx: &mut AsyncAppContext,
2122 ) -> anyhow::Result<()> {
2123 let receipt = envelope.receipt();
2124 let worktree = rpc
2125 .state
2126 .read()
2127 .await
2128 .shared_worktree(envelope.payload.worktree_id, cx)?;
2129
2130 let response = worktree
2131 .update(cx, |worktree, cx| {
2132 worktree
2133 .as_local_mut()
2134 .unwrap()
2135 .open_remote_buffer(envelope, cx)
2136 })
2137 .await?;
2138
2139 rpc.respond(receipt, response).await?;
2140
2141 Ok(())
2142 }
2143
2144 pub async fn close_buffer(
2145 envelope: TypedEnvelope<proto::CloseBuffer>,
2146 rpc: &rpc::Client,
2147 cx: &mut AsyncAppContext,
2148 ) -> anyhow::Result<()> {
2149 let worktree = rpc
2150 .state
2151 .read()
2152 .await
2153 .shared_worktree(envelope.payload.worktree_id, cx)?;
2154
2155 worktree.update(cx, |worktree, cx| {
2156 worktree
2157 .as_local_mut()
2158 .unwrap()
2159 .close_remote_buffer(envelope, cx)
2160 })
2161 }
2162
2163 pub async fn update_buffer(
2164 envelope: TypedEnvelope<proto::UpdateBuffer>,
2165 rpc: &rpc::Client,
2166 cx: &mut AsyncAppContext,
2167 ) -> anyhow::Result<()> {
2168 let message = envelope.payload;
2169 rpc.state
2170 .read()
2171 .await
2172 .shared_worktree(message.worktree_id, cx)?
2173 .update(cx, |tree, cx| tree.update_buffer(message, cx))?;
2174 Ok(())
2175 }
2176
2177 pub async fn save_buffer(
2178 envelope: TypedEnvelope<proto::SaveBuffer>,
2179 rpc: &rpc::Client,
2180 cx: &mut AsyncAppContext,
2181 ) -> anyhow::Result<()> {
2182 let state = rpc.state.read().await;
2183 let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
2184 let sender_id = envelope.original_sender_id()?;
2185 let buffer = worktree.read_with(cx, |tree, _| {
2186 tree.as_local()
2187 .unwrap()
2188 .shared_buffers
2189 .get(&sender_id)
2190 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
2191 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
2192 })?;
2193 let version = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
2194 rpc.respond(
2195 envelope.receipt(),
2196 proto::BufferSaved {
2197 worktree_id: envelope.payload.worktree_id,
2198 buffer_id: envelope.payload.buffer_id,
2199 version: (&version).into(),
2200 },
2201 )
2202 .await?;
2203 Ok(())
2204 }
2205
2206 pub async fn buffer_saved(
2207 envelope: TypedEnvelope<proto::BufferSaved>,
2208 rpc: &rpc::Client,
2209 cx: &mut AsyncAppContext,
2210 ) -> anyhow::Result<()> {
2211 rpc.state
2212 .read()
2213 .await
2214 .shared_worktree(envelope.payload.worktree_id, cx)?
2215 .update(cx, |worktree, cx| {
2216 worktree.buffer_saved(envelope.payload, cx)
2217 })?;
2218 Ok(())
2219 }
2220}
2221
2222#[cfg(test)]
2223mod tests {
2224 use super::*;
2225 use crate::test::*;
2226 use anyhow::Result;
2227 use rand::prelude::*;
2228 use serde_json::json;
2229 use std::time::UNIX_EPOCH;
2230 use std::{env, fmt::Write, os::unix, time::SystemTime};
2231
2232 #[gpui::test]
2233 async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
2234 let dir = temp_tree(json!({
2235 "root": {
2236 "apple": "",
2237 "banana": {
2238 "carrot": {
2239 "date": "",
2240 "endive": "",
2241 }
2242 },
2243 "fennel": {
2244 "grape": "",
2245 }
2246 }
2247 }));
2248
2249 let root_link_path = dir.path().join("root_link");
2250 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2251 unix::fs::symlink(
2252 &dir.path().join("root/fennel"),
2253 &dir.path().join("root/finnochio"),
2254 )
2255 .unwrap();
2256
2257 let tree = cx.add_model(|cx| Worktree::local(root_link_path, Default::default(), cx));
2258
2259 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2260 .await;
2261 cx.read(|cx| {
2262 let tree = tree.read(cx);
2263 assert_eq!(tree.file_count(), 5);
2264
2265 assert_eq!(
2266 tree.inode_for_path("fennel/grape"),
2267 tree.inode_for_path("finnochio/grape")
2268 );
2269
2270 let results = match_paths(
2271 Some(tree.snapshot()).iter(),
2272 "bna",
2273 false,
2274 false,
2275 false,
2276 10,
2277 Default::default(),
2278 cx.thread_pool().clone(),
2279 )
2280 .into_iter()
2281 .map(|result| result.path)
2282 .collect::<Vec<Arc<Path>>>();
2283 assert_eq!(
2284 results,
2285 vec![
2286 PathBuf::from("banana/carrot/date").into(),
2287 PathBuf::from("banana/carrot/endive").into(),
2288 ]
2289 );
2290 })
2291 }
2292
2293 #[gpui::test]
2294 async fn test_save_file(mut cx: gpui::TestAppContext) {
2295 let app_state = cx.read(build_app_state);
2296 let dir = temp_tree(json!({
2297 "file1": "the old contents",
2298 }));
2299 let tree = cx.add_model(|cx| Worktree::local(dir.path(), app_state.languages, cx));
2300 let buffer = tree
2301 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2302 .await
2303 .unwrap();
2304 let save = buffer.update(&mut cx, |buffer, cx| {
2305 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2306 buffer.save(cx).unwrap()
2307 });
2308 save.await.unwrap();
2309
2310 let new_text = fs::read_to_string(dir.path().join("file1")).unwrap();
2311 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2312 }
2313
2314 #[gpui::test]
2315 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2316 let app_state = cx.read(build_app_state);
2317 let dir = temp_tree(json!({
2318 "file1": "the old contents",
2319 }));
2320 let file_path = dir.path().join("file1");
2321
2322 let tree = cx.add_model(|cx| Worktree::local(file_path.clone(), app_state.languages, cx));
2323 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2324 .await;
2325 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2326
2327 let buffer = tree
2328 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2329 .await
2330 .unwrap();
2331 let save = buffer.update(&mut cx, |buffer, cx| {
2332 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2333 buffer.save(cx).unwrap()
2334 });
2335 save.await.unwrap();
2336
2337 let new_text = fs::read_to_string(file_path).unwrap();
2338 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2339 }
2340
2341 #[gpui::test]
2342 async fn test_rescan_simple(mut cx: gpui::TestAppContext) {
2343 let dir = temp_tree(json!({
2344 "a": {
2345 "file1": "",
2346 "file2": "",
2347 "file3": "",
2348 },
2349 "b": {
2350 "c": {
2351 "file4": "",
2352 "file5": "",
2353 }
2354 }
2355 }));
2356
2357 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
2358
2359 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2360 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2361 async move { buffer.await.unwrap() }
2362 };
2363 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2364 tree.read_with(cx, |tree, _| {
2365 tree.entry_for_path(path)
2366 .expect(&format!("no entry for path {}", path))
2367 .id
2368 })
2369 };
2370
2371 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2372 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2373 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2374 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2375
2376 let file2_id = id_for_path("a/file2", &cx);
2377 let file3_id = id_for_path("a/file3", &cx);
2378 let file4_id = id_for_path("b/c/file4", &cx);
2379
2380 // After scanning, the worktree knows which files exist and which don't.
2381 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2382 .await;
2383
2384 cx.read(|cx| {
2385 assert!(!buffer2.read(cx).is_dirty());
2386 assert!(!buffer3.read(cx).is_dirty());
2387 assert!(!buffer4.read(cx).is_dirty());
2388 assert!(!buffer5.read(cx).is_dirty());
2389 });
2390
2391 tree.flush_fs_events(&cx).await;
2392 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2393 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2394 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2395 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2396 tree.flush_fs_events(&cx).await;
2397
2398 cx.read(|app| {
2399 assert_eq!(
2400 tree.read(app)
2401 .paths()
2402 .map(|p| p.to_str().unwrap())
2403 .collect::<Vec<_>>(),
2404 vec![
2405 "a",
2406 "a/file1",
2407 "a/file2.new",
2408 "b",
2409 "d",
2410 "d/file3",
2411 "d/file4"
2412 ]
2413 );
2414
2415 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2416 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2417 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2418
2419 assert_eq!(
2420 buffer2.read(app).file().unwrap().path().as_ref(),
2421 Path::new("a/file2.new")
2422 );
2423 assert_eq!(
2424 buffer3.read(app).file().unwrap().path().as_ref(),
2425 Path::new("d/file3")
2426 );
2427 assert_eq!(
2428 buffer4.read(app).file().unwrap().path().as_ref(),
2429 Path::new("d/file4")
2430 );
2431 assert_eq!(
2432 buffer5.read(app).file().unwrap().path().as_ref(),
2433 Path::new("b/c/file5")
2434 );
2435
2436 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2437 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2438 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2439 assert!(buffer5.read(app).file().unwrap().is_deleted());
2440 });
2441 }
2442
2443 #[gpui::test]
2444 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
2445 let dir = temp_tree(json!({
2446 ".git": {},
2447 ".gitignore": "ignored-dir\n",
2448 "tracked-dir": {
2449 "tracked-file1": "tracked contents",
2450 },
2451 "ignored-dir": {
2452 "ignored-file1": "ignored contents",
2453 }
2454 }));
2455
2456 let tree = cx.add_model(|cx| Worktree::local(dir.path(), Default::default(), cx));
2457 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2458 .await;
2459 tree.flush_fs_events(&cx).await;
2460 cx.read(|cx| {
2461 let tree = tree.read(cx);
2462 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2463 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2464 assert_eq!(tracked.is_ignored(), false);
2465 assert_eq!(ignored.is_ignored(), true);
2466 });
2467
2468 fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2469 fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2470 tree.flush_fs_events(&cx).await;
2471 cx.read(|cx| {
2472 let tree = tree.read(cx);
2473 let dot_git = tree.entry_for_path(".git").unwrap();
2474 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2475 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2476 assert_eq!(tracked.is_ignored(), false);
2477 assert_eq!(ignored.is_ignored(), true);
2478 assert_eq!(dot_git.is_ignored(), true);
2479 });
2480 }
2481
2482 #[test]
2483 fn test_random() {
2484 let iterations = env::var("ITERATIONS")
2485 .map(|i| i.parse().unwrap())
2486 .unwrap_or(100);
2487 let operations = env::var("OPERATIONS")
2488 .map(|o| o.parse().unwrap())
2489 .unwrap_or(40);
2490 let initial_entries = env::var("INITIAL_ENTRIES")
2491 .map(|o| o.parse().unwrap())
2492 .unwrap_or(20);
2493 let seeds = if let Ok(seed) = env::var("SEED").map(|s| s.parse().unwrap()) {
2494 seed..seed + 1
2495 } else {
2496 0..iterations
2497 };
2498
2499 for seed in seeds {
2500 dbg!(seed);
2501 let mut rng = StdRng::seed_from_u64(seed);
2502
2503 let root_dir = tempdir::TempDir::new(&format!("test-{}", seed)).unwrap();
2504 for _ in 0..initial_entries {
2505 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
2506 }
2507 log::info!("Generated initial tree");
2508
2509 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2510 let mut scanner = BackgroundScanner::new(
2511 Arc::new(Mutex::new(Snapshot {
2512 id: 0,
2513 scan_id: 0,
2514 abs_path: root_dir.path().into(),
2515 entries: Default::default(),
2516 paths_by_id: Default::default(),
2517 removed_entry_ids: Default::default(),
2518 ignores: Default::default(),
2519 root_name: Default::default(),
2520 root_char_bag: Default::default(),
2521 next_entry_id: Default::default(),
2522 })),
2523 notify_tx,
2524 0,
2525 );
2526 scanner.scan_dirs().unwrap();
2527 scanner.snapshot().check_invariants();
2528
2529 let mut events = Vec::new();
2530 let mut mutations_len = operations;
2531 while mutations_len > 1 {
2532 if !events.is_empty() && rng.gen_bool(0.4) {
2533 let len = rng.gen_range(0..=events.len());
2534 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
2535 log::info!("Delivering events: {:#?}", to_deliver);
2536 scanner.process_events(to_deliver);
2537 scanner.snapshot().check_invariants();
2538 } else {
2539 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
2540 mutations_len -= 1;
2541 }
2542 }
2543 log::info!("Quiescing: {:#?}", events);
2544 scanner.process_events(events);
2545 scanner.snapshot().check_invariants();
2546
2547 let (notify_tx, _notify_rx) = smol::channel::unbounded();
2548 let mut new_scanner = BackgroundScanner::new(
2549 Arc::new(Mutex::new(Snapshot {
2550 id: 0,
2551 scan_id: 0,
2552 abs_path: root_dir.path().into(),
2553 entries: Default::default(),
2554 paths_by_id: Default::default(),
2555 removed_entry_ids: Default::default(),
2556 ignores: Default::default(),
2557 root_name: Default::default(),
2558 root_char_bag: Default::default(),
2559 next_entry_id: Default::default(),
2560 })),
2561 notify_tx,
2562 1,
2563 );
2564 new_scanner.scan_dirs().unwrap();
2565 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
2566 }
2567 }
2568
2569 fn randomly_mutate_tree(
2570 root_path: &Path,
2571 insertion_probability: f64,
2572 rng: &mut impl Rng,
2573 ) -> Result<Vec<fsevent::Event>> {
2574 let root_path = root_path.canonicalize().unwrap();
2575 let (dirs, files) = read_dir_recursive(root_path.clone());
2576
2577 let mut events = Vec::new();
2578 let mut record_event = |path: PathBuf| {
2579 events.push(fsevent::Event {
2580 event_id: SystemTime::now()
2581 .duration_since(UNIX_EPOCH)
2582 .unwrap()
2583 .as_secs(),
2584 flags: fsevent::StreamFlags::empty(),
2585 path,
2586 });
2587 };
2588
2589 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
2590 let path = dirs.choose(rng).unwrap();
2591 let new_path = path.join(gen_name(rng));
2592
2593 if rng.gen() {
2594 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
2595 fs::create_dir(&new_path)?;
2596 } else {
2597 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
2598 fs::write(&new_path, "")?;
2599 }
2600 record_event(new_path);
2601 } else if rng.gen_bool(0.05) {
2602 let ignore_dir_path = dirs.choose(rng).unwrap();
2603 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
2604
2605 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
2606 let files_to_ignore = {
2607 let len = rng.gen_range(0..=subfiles.len());
2608 subfiles.choose_multiple(rng, len)
2609 };
2610 let dirs_to_ignore = {
2611 let len = rng.gen_range(0..subdirs.len());
2612 subdirs.choose_multiple(rng, len)
2613 };
2614
2615 let mut ignore_contents = String::new();
2616 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
2617 write!(
2618 ignore_contents,
2619 "{}\n",
2620 path_to_ignore
2621 .strip_prefix(&ignore_dir_path)?
2622 .to_str()
2623 .unwrap()
2624 )
2625 .unwrap();
2626 }
2627 log::info!(
2628 "Creating {:?} with contents:\n{}",
2629 ignore_path.strip_prefix(&root_path)?,
2630 ignore_contents
2631 );
2632 fs::write(&ignore_path, ignore_contents).unwrap();
2633 record_event(ignore_path);
2634 } else {
2635 let old_path = {
2636 let file_path = files.choose(rng);
2637 let dir_path = dirs[1..].choose(rng);
2638 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
2639 };
2640
2641 let is_rename = rng.gen();
2642 if is_rename {
2643 let new_path_parent = dirs
2644 .iter()
2645 .filter(|d| !d.starts_with(old_path))
2646 .choose(rng)
2647 .unwrap();
2648
2649 let overwrite_existing_dir =
2650 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
2651 let new_path = if overwrite_existing_dir {
2652 fs::remove_dir_all(&new_path_parent).ok();
2653 new_path_parent.to_path_buf()
2654 } else {
2655 new_path_parent.join(gen_name(rng))
2656 };
2657
2658 log::info!(
2659 "Renaming {:?} to {}{:?}",
2660 old_path.strip_prefix(&root_path)?,
2661 if overwrite_existing_dir {
2662 "overwrite "
2663 } else {
2664 ""
2665 },
2666 new_path.strip_prefix(&root_path)?
2667 );
2668 fs::rename(&old_path, &new_path)?;
2669 record_event(old_path.clone());
2670 record_event(new_path);
2671 } else if old_path.is_dir() {
2672 let (dirs, files) = read_dir_recursive(old_path.clone());
2673
2674 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
2675 fs::remove_dir_all(&old_path).unwrap();
2676 for file in files {
2677 record_event(file);
2678 }
2679 for dir in dirs {
2680 record_event(dir);
2681 }
2682 } else {
2683 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
2684 fs::remove_file(old_path).unwrap();
2685 record_event(old_path.clone());
2686 }
2687 }
2688
2689 Ok(events)
2690 }
2691
2692 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
2693 let child_entries = fs::read_dir(&path).unwrap();
2694 let mut dirs = vec![path];
2695 let mut files = Vec::new();
2696 for child_entry in child_entries {
2697 let child_path = child_entry.unwrap().path();
2698 if child_path.is_dir() {
2699 let (child_dirs, child_files) = read_dir_recursive(child_path);
2700 dirs.extend(child_dirs);
2701 files.extend(child_files);
2702 } else {
2703 files.push(child_path);
2704 }
2705 }
2706 (dirs, files)
2707 }
2708
2709 fn gen_name(rng: &mut impl Rng) -> String {
2710 (0..6)
2711 .map(|_| rng.sample(rand::distributions::Alphanumeric))
2712 .map(char::from)
2713 .collect()
2714 }
2715
2716 impl Snapshot {
2717 fn check_invariants(&self) {
2718 let mut files = self.files(0);
2719 let mut visible_files = self.visible_files(0);
2720 for entry in self.entries.cursor::<(), ()>() {
2721 if entry.is_file() {
2722 assert_eq!(files.next().unwrap().inode(), entry.inode);
2723 if !entry.is_ignored {
2724 assert_eq!(visible_files.next().unwrap().inode(), entry.inode);
2725 }
2726 }
2727 }
2728 assert!(files.next().is_none());
2729 assert!(visible_files.next().is_none());
2730
2731 let mut bfs_paths = Vec::new();
2732 let mut stack = vec![Path::new("")];
2733 while let Some(path) = stack.pop() {
2734 bfs_paths.push(path);
2735 let ix = stack.len();
2736 for child_entry in self.child_entries(path) {
2737 stack.insert(ix, child_entry.path());
2738 }
2739 }
2740
2741 let dfs_paths = self
2742 .entries
2743 .cursor::<(), ()>()
2744 .map(|e| e.path().as_ref())
2745 .collect::<Vec<_>>();
2746 assert_eq!(bfs_paths, dfs_paths);
2747
2748 for (ignore_parent_path, _) in &self.ignores {
2749 assert!(self.entry_for_path(ignore_parent_path).is_some());
2750 assert!(self
2751 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
2752 .is_some());
2753 }
2754 }
2755
2756 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
2757 let mut paths = Vec::new();
2758 for entry in self.entries.cursor::<(), ()>() {
2759 paths.push((entry.path().as_ref(), entry.inode(), entry.is_ignored()));
2760 }
2761 paths.sort_by(|a, b| a.0.cmp(&b.0));
2762 paths
2763 }
2764 }
2765}