1mod ignore;
2
3use self::ignore::IgnoreStack;
4use crate::{
5 editor::{self, Buffer, History, Operation, Rope},
6 fs::{self, Fs},
7 fuzzy,
8 fuzzy::CharBag,
9 language::LanguageRegistry,
10 rpc::{self, proto, Status},
11 time::{self, ReplicaId},
12 util::{Bias, TryFutureExt},
13};
14use ::ignore::gitignore::Gitignore;
15use anyhow::{anyhow, Result};
16use futures::{Stream, StreamExt};
17pub use fuzzy::{match_paths, PathMatch};
18use gpui::{
19 executor,
20 sum_tree::{self, Cursor, Edit, SumTree},
21 AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
22 UpgradeModelHandle, WeakModelHandle,
23};
24use lazy_static::lazy_static;
25use parking_lot::Mutex;
26use postage::{
27 prelude::{Sink as _, Stream as _},
28 watch,
29};
30use serde::Deserialize;
31use smol::channel::{self, Sender};
32use std::{
33 cmp::{self, Ordering},
34 collections::HashMap,
35 convert::{TryFrom, TryInto},
36 ffi::{OsStr, OsString},
37 fmt,
38 future::Future,
39 ops::Deref,
40 path::{Path, PathBuf},
41 sync::{
42 atomic::{AtomicUsize, Ordering::SeqCst},
43 Arc,
44 },
45 time::{Duration, SystemTime},
46};
47use zrpc::{PeerId, TypedEnvelope};
48
49lazy_static! {
50 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
51}
52
53#[derive(Clone, Debug)]
54enum ScanState {
55 Idle,
56 Scanning,
57 Err(Arc<anyhow::Error>),
58}
59
60pub enum Worktree {
61 Local(LocalWorktree),
62 Remote(RemoteWorktree),
63}
64
65impl Entity for Worktree {
66 type Event = ();
67
68 fn release(&mut self, cx: &mut MutableAppContext) {
69 let rpc = match self {
70 Self::Local(tree) => tree
71 .remote_id
72 .borrow()
73 .map(|remote_id| (tree.rpc.clone(), remote_id)),
74 Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)),
75 };
76
77 if let Some((rpc, worktree_id)) = rpc {
78 cx.spawn(|_| async move {
79 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
80 log::error!("error closing worktree {}: {}", worktree_id, err);
81 }
82 })
83 .detach();
84 }
85 }
86}
87
88impl Worktree {
89 pub async fn open_local(
90 rpc: Arc<rpc::Client>,
91 path: impl Into<Arc<Path>>,
92 fs: Arc<dyn Fs>,
93 languages: Arc<LanguageRegistry>,
94 cx: &mut AsyncAppContext,
95 ) -> Result<ModelHandle<Self>> {
96 let (tree, scan_states_tx) =
97 LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
98 tree.update(cx, |tree, cx| {
99 let tree = tree.as_local_mut().unwrap();
100 let abs_path = tree.snapshot.abs_path.clone();
101 let background_snapshot = tree.background_snapshot.clone();
102 let background = cx.background().clone();
103 tree._background_scanner_task = Some(cx.background().spawn(async move {
104 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
105 let scanner =
106 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
107 scanner.run(events).await;
108 }));
109 });
110 Ok(tree)
111 }
112
113 pub async fn open_remote(
114 rpc: Arc<rpc::Client>,
115 id: u64,
116 languages: Arc<LanguageRegistry>,
117 cx: &mut AsyncAppContext,
118 ) -> Result<ModelHandle<Self>> {
119 let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
120 Worktree::remote(response, rpc, languages, cx).await
121 }
122
123 async fn remote(
124 join_response: proto::JoinWorktreeResponse,
125 rpc: Arc<rpc::Client>,
126 languages: Arc<LanguageRegistry>,
127 cx: &mut AsyncAppContext,
128 ) -> Result<ModelHandle<Self>> {
129 let worktree = join_response
130 .worktree
131 .ok_or_else(|| anyhow!("empty worktree"))?;
132
133 let remote_id = worktree.id;
134 let replica_id = join_response.replica_id as ReplicaId;
135 let peers = join_response.peers;
136 let root_char_bag: CharBag = worktree
137 .root_name
138 .chars()
139 .map(|c| c.to_ascii_lowercase())
140 .collect();
141 let root_name = worktree.root_name.clone();
142 let (entries_by_path, entries_by_id) = cx
143 .background()
144 .spawn(async move {
145 let mut entries_by_path_edits = Vec::new();
146 let mut entries_by_id_edits = Vec::new();
147 for entry in worktree.entries {
148 match Entry::try_from((&root_char_bag, entry)) {
149 Ok(entry) => {
150 entries_by_id_edits.push(Edit::Insert(PathEntry {
151 id: entry.id,
152 path: entry.path.clone(),
153 scan_id: 0,
154 }));
155 entries_by_path_edits.push(Edit::Insert(entry));
156 }
157 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
158 }
159 }
160
161 let mut entries_by_path = SumTree::new();
162 let mut entries_by_id = SumTree::new();
163 entries_by_path.edit(entries_by_path_edits, &());
164 entries_by_id.edit(entries_by_id_edits, &());
165 (entries_by_path, entries_by_id)
166 })
167 .await;
168
169 let worktree = cx.update(|cx| {
170 cx.add_model(|cx: &mut ModelContext<Worktree>| {
171 let snapshot = Snapshot {
172 id: cx.model_id(),
173 scan_id: 0,
174 abs_path: Path::new("").into(),
175 root_name,
176 root_char_bag,
177 ignores: Default::default(),
178 entries_by_path,
179 entries_by_id,
180 removed_entry_ids: Default::default(),
181 next_entry_id: Default::default(),
182 };
183
184 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
185 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
186
187 cx.background()
188 .spawn(async move {
189 while let Some(update) = updates_rx.recv().await {
190 let mut snapshot = snapshot_tx.borrow().clone();
191 if let Err(error) = snapshot.apply_update(update) {
192 log::error!("error applying worktree update: {}", error);
193 }
194 *snapshot_tx.borrow_mut() = snapshot;
195 }
196 })
197 .detach();
198
199 {
200 let mut snapshot_rx = snapshot_rx.clone();
201 cx.spawn_weak(|this, mut cx| async move {
202 while let Some(_) = snapshot_rx.recv().await {
203 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
204 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
205 } else {
206 break;
207 }
208 }
209 })
210 .detach();
211 }
212
213 let _subscriptions = vec![
214 rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer),
215 rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer),
216 rpc.subscribe_from_model(remote_id, cx, Self::handle_update),
217 rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer),
218 rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved),
219 ];
220
221 Worktree::Remote(RemoteWorktree {
222 remote_id,
223 replica_id,
224 snapshot,
225 snapshot_rx,
226 updates_tx,
227 rpc: rpc.clone(),
228 open_buffers: Default::default(),
229 peers: peers
230 .into_iter()
231 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
232 .collect(),
233 queued_operations: Default::default(),
234 languages,
235 _subscriptions,
236 })
237 })
238 });
239
240 Ok(worktree)
241 }
242
243 pub fn as_local(&self) -> Option<&LocalWorktree> {
244 if let Worktree::Local(worktree) = self {
245 Some(worktree)
246 } else {
247 None
248 }
249 }
250
251 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
252 if let Worktree::Local(worktree) = self {
253 Some(worktree)
254 } else {
255 None
256 }
257 }
258
259 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
260 if let Worktree::Remote(worktree) = self {
261 Some(worktree)
262 } else {
263 None
264 }
265 }
266
267 pub fn snapshot(&self) -> Snapshot {
268 match self {
269 Worktree::Local(worktree) => worktree.snapshot(),
270 Worktree::Remote(worktree) => worktree.snapshot(),
271 }
272 }
273
274 pub fn replica_id(&self) -> ReplicaId {
275 match self {
276 Worktree::Local(_) => 0,
277 Worktree::Remote(worktree) => worktree.replica_id,
278 }
279 }
280
281 pub fn handle_add_peer(
282 &mut self,
283 envelope: TypedEnvelope<proto::AddPeer>,
284 _: Arc<rpc::Client>,
285 cx: &mut ModelContext<Self>,
286 ) -> Result<()> {
287 match self {
288 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
289 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
290 }
291 }
292
293 pub fn handle_remove_peer(
294 &mut self,
295 envelope: TypedEnvelope<proto::RemovePeer>,
296 _: Arc<rpc::Client>,
297 cx: &mut ModelContext<Self>,
298 ) -> Result<()> {
299 match self {
300 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
301 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
302 }
303 }
304
305 pub fn handle_update(
306 &mut self,
307 envelope: TypedEnvelope<proto::UpdateWorktree>,
308 _: Arc<rpc::Client>,
309 cx: &mut ModelContext<Self>,
310 ) -> anyhow::Result<()> {
311 self.as_remote_mut()
312 .unwrap()
313 .update_from_remote(envelope, cx)
314 }
315
316 pub fn handle_open_buffer(
317 &mut self,
318 envelope: TypedEnvelope<proto::OpenBuffer>,
319 rpc: Arc<rpc::Client>,
320 cx: &mut ModelContext<Self>,
321 ) -> anyhow::Result<()> {
322 let receipt = envelope.receipt();
323
324 let response = self
325 .as_local_mut()
326 .unwrap()
327 .open_remote_buffer(envelope, cx);
328
329 cx.background()
330 .spawn(
331 async move {
332 rpc.respond(receipt, response.await?).await?;
333 Ok(())
334 }
335 .log_err(),
336 )
337 .detach();
338
339 Ok(())
340 }
341
342 pub fn handle_close_buffer(
343 &mut self,
344 envelope: TypedEnvelope<proto::CloseBuffer>,
345 _: Arc<rpc::Client>,
346 cx: &mut ModelContext<Self>,
347 ) -> anyhow::Result<()> {
348 self.as_local_mut()
349 .unwrap()
350 .close_remote_buffer(envelope, cx)
351 }
352
353 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
354 match self {
355 Worktree::Local(worktree) => &worktree.peers,
356 Worktree::Remote(worktree) => &worktree.peers,
357 }
358 }
359
360 pub fn open_buffer(
361 &mut self,
362 path: impl AsRef<Path>,
363 cx: &mut ModelContext<Self>,
364 ) -> Task<Result<ModelHandle<Buffer>>> {
365 match self {
366 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
367 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
368 }
369 }
370
371 #[cfg(feature = "test-support")]
372 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
373 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
374 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
375 Worktree::Remote(worktree) => {
376 Box::new(worktree.open_buffers.values().filter_map(|buf| {
377 if let RemoteBuffer::Loaded(buf) = buf {
378 Some(buf)
379 } else {
380 None
381 }
382 }))
383 }
384 };
385
386 let path = path.as_ref();
387 open_buffers
388 .find(|buffer| {
389 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
390 file.path.as_ref() == path
391 } else {
392 false
393 }
394 })
395 .is_some()
396 }
397
398 pub fn handle_update_buffer(
399 &mut self,
400 envelope: TypedEnvelope<proto::UpdateBuffer>,
401 _: Arc<rpc::Client>,
402 cx: &mut ModelContext<Self>,
403 ) -> Result<()> {
404 let payload = envelope.payload.clone();
405 let buffer_id = payload.buffer_id as usize;
406 let ops = payload
407 .operations
408 .into_iter()
409 .map(|op| op.try_into())
410 .collect::<anyhow::Result<Vec<_>>>()?;
411
412 match self {
413 Worktree::Local(worktree) => {
414 let buffer = worktree
415 .open_buffers
416 .get(&buffer_id)
417 .and_then(|buf| buf.upgrade(cx))
418 .ok_or_else(|| {
419 anyhow!("invalid buffer {} in update buffer message", buffer_id)
420 })?;
421 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
422 }
423 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
424 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
425 Some(RemoteBuffer::Loaded(buffer)) => {
426 if let Some(buffer) = buffer.upgrade(cx) {
427 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
428 } else {
429 worktree
430 .open_buffers
431 .insert(buffer_id, RemoteBuffer::Operations(ops));
432 }
433 }
434 None => {
435 worktree
436 .open_buffers
437 .insert(buffer_id, RemoteBuffer::Operations(ops));
438 }
439 },
440 }
441
442 Ok(())
443 }
444
445 pub fn handle_save_buffer(
446 &mut self,
447 envelope: TypedEnvelope<proto::SaveBuffer>,
448 rpc: Arc<rpc::Client>,
449 cx: &mut ModelContext<Self>,
450 ) -> Result<()> {
451 let sender_id = envelope.original_sender_id()?;
452 let buffer = self
453 .as_local()
454 .unwrap()
455 .shared_buffers
456 .get(&sender_id)
457 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
458 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
459
460 let receipt = envelope.receipt();
461 let worktree_id = envelope.payload.worktree_id;
462 let buffer_id = envelope.payload.buffer_id;
463 let save = cx.spawn(|_, mut cx| async move {
464 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
465 });
466
467 cx.background()
468 .spawn(
469 async move {
470 let (version, mtime) = save.await?;
471
472 rpc.respond(
473 receipt,
474 proto::BufferSaved {
475 worktree_id,
476 buffer_id,
477 version: (&version).into(),
478 mtime: Some(mtime.into()),
479 },
480 )
481 .await?;
482
483 Ok(())
484 }
485 .log_err(),
486 )
487 .detach();
488
489 Ok(())
490 }
491
492 pub fn handle_buffer_saved(
493 &mut self,
494 envelope: TypedEnvelope<proto::BufferSaved>,
495 _: Arc<rpc::Client>,
496 cx: &mut ModelContext<Self>,
497 ) -> Result<()> {
498 let payload = envelope.payload.clone();
499 let worktree = self.as_remote_mut().unwrap();
500 if let Some(buffer) = worktree
501 .open_buffers
502 .get(&(payload.buffer_id as usize))
503 .and_then(|buf| buf.upgrade(cx))
504 {
505 buffer.update(cx, |buffer, cx| {
506 let version = payload.version.try_into()?;
507 let mtime = payload
508 .mtime
509 .ok_or_else(|| anyhow!("missing mtime"))?
510 .into();
511 buffer.did_save(version, mtime, cx);
512 Result::<_, anyhow::Error>::Ok(())
513 })?;
514 }
515 Ok(())
516 }
517
518 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
519 match self {
520 Self::Local(worktree) => {
521 let is_fake_fs = worktree.fs.is_fake();
522 worktree.snapshot = worktree.background_snapshot.lock().clone();
523 if worktree.is_scanning() {
524 if worktree.poll_task.is_none() {
525 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
526 if is_fake_fs {
527 smol::future::yield_now().await;
528 } else {
529 smol::Timer::after(Duration::from_millis(100)).await;
530 }
531 this.update(&mut cx, |this, cx| {
532 this.as_local_mut().unwrap().poll_task = None;
533 this.poll_snapshot(cx);
534 })
535 }));
536 }
537 } else {
538 worktree.poll_task.take();
539 self.update_open_buffers(cx);
540 }
541 }
542 Self::Remote(worktree) => {
543 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
544 self.update_open_buffers(cx);
545 }
546 };
547
548 cx.notify();
549 }
550
551 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
552 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
553 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
554 Self::Remote(worktree) => {
555 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
556 if let RemoteBuffer::Loaded(buf) = buf {
557 Some((id, buf))
558 } else {
559 None
560 }
561 }))
562 }
563 };
564
565 let mut buffers_to_delete = Vec::new();
566 for (buffer_id, buffer) in open_buffers {
567 if let Some(buffer) = buffer.upgrade(cx) {
568 buffer.update(cx, |buffer, cx| {
569 let buffer_is_clean = !buffer.is_dirty();
570
571 if let Some(file) = buffer.file_mut() {
572 let mut file_changed = false;
573
574 if let Some(entry) = file
575 .entry_id
576 .and_then(|entry_id| self.entry_for_id(entry_id))
577 {
578 if entry.path != file.path {
579 file.path = entry.path.clone();
580 file_changed = true;
581 }
582
583 if entry.mtime != file.mtime {
584 file.mtime = entry.mtime;
585 file_changed = true;
586 if let Some(worktree) = self.as_local() {
587 if buffer_is_clean {
588 let abs_path = worktree.absolutize(&file.path);
589 refresh_buffer(abs_path, &worktree.fs, cx);
590 }
591 }
592 }
593 } else if let Some(entry) = self.entry_for_path(&file.path) {
594 file.entry_id = Some(entry.id);
595 file.mtime = entry.mtime;
596 if let Some(worktree) = self.as_local() {
597 if buffer_is_clean {
598 let abs_path = worktree.absolutize(&file.path);
599 refresh_buffer(abs_path, &worktree.fs, cx);
600 }
601 }
602 file_changed = true;
603 } else if !file.is_deleted() {
604 if buffer_is_clean {
605 cx.emit(editor::buffer::Event::Dirtied);
606 }
607 file.entry_id = None;
608 file_changed = true;
609 }
610
611 if file_changed {
612 cx.emit(editor::buffer::Event::FileHandleChanged);
613 }
614 }
615 });
616 } else {
617 buffers_to_delete.push(*buffer_id);
618 }
619 }
620
621 for buffer_id in buffers_to_delete {
622 match self {
623 Self::Local(worktree) => {
624 worktree.open_buffers.remove(&buffer_id);
625 }
626 Self::Remote(worktree) => {
627 worktree.open_buffers.remove(&buffer_id);
628 }
629 }
630 }
631 }
632}
633
634impl Deref for Worktree {
635 type Target = Snapshot;
636
637 fn deref(&self) -> &Self::Target {
638 match self {
639 Worktree::Local(worktree) => &worktree.snapshot,
640 Worktree::Remote(worktree) => &worktree.snapshot,
641 }
642 }
643}
644
645pub struct LocalWorktree {
646 snapshot: Snapshot,
647 config: WorktreeConfig,
648 background_snapshot: Arc<Mutex<Snapshot>>,
649 last_scan_state_rx: watch::Receiver<ScanState>,
650 _background_scanner_task: Option<Task<()>>,
651 _maintain_remote_id_task: Task<Option<()>>,
652 poll_task: Option<Task<()>>,
653 remote_id: watch::Receiver<Option<u64>>,
654 share: Option<ShareState>,
655 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
656 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
657 peers: HashMap<PeerId, ReplicaId>,
658 languages: Arc<LanguageRegistry>,
659 queued_operations: Vec<(u64, Operation)>,
660 rpc: Arc<rpc::Client>,
661 fs: Arc<dyn Fs>,
662}
663
664#[derive(Default, Deserialize)]
665struct WorktreeConfig {
666 collaborators: Vec<String>,
667}
668
669impl LocalWorktree {
670 async fn new(
671 rpc: Arc<rpc::Client>,
672 path: impl Into<Arc<Path>>,
673 fs: Arc<dyn Fs>,
674 languages: Arc<LanguageRegistry>,
675 cx: &mut AsyncAppContext,
676 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
677 let abs_path = path.into();
678 let path: Arc<Path> = Arc::from(Path::new(""));
679 let next_entry_id = AtomicUsize::new(0);
680
681 // After determining whether the root entry is a file or a directory, populate the
682 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
683 let root_name = abs_path
684 .file_name()
685 .map_or(String::new(), |f| f.to_string_lossy().to_string());
686 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
687 let metadata = fs.metadata(&abs_path).await?;
688
689 let mut config = WorktreeConfig::default();
690 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
691 if let Ok(parsed) = toml::from_str(&zed_toml) {
692 config = parsed;
693 }
694 }
695
696 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
697 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
698 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
699 let mut snapshot = Snapshot {
700 id: cx.model_id(),
701 scan_id: 0,
702 abs_path,
703 root_name: root_name.clone(),
704 root_char_bag,
705 ignores: Default::default(),
706 entries_by_path: Default::default(),
707 entries_by_id: Default::default(),
708 removed_entry_ids: Default::default(),
709 next_entry_id: Arc::new(next_entry_id),
710 };
711 if let Some(metadata) = metadata {
712 snapshot.insert_entry(Entry::new(
713 path.into(),
714 &metadata,
715 &snapshot.next_entry_id,
716 snapshot.root_char_bag,
717 ));
718 }
719
720 let (mut remote_id_tx, remote_id_rx) = watch::channel();
721 let _maintain_remote_id_task = cx.spawn_weak({
722 let rpc = rpc.clone();
723 move |this, cx| {
724 async move {
725 let mut status = rpc.status();
726 while let Some(status) = status.recv().await {
727 if let Some(this) = this.upgrade(&cx) {
728 let remote_id = if let Status::Connected { .. } = status {
729 let collaborator_logins = this.read_with(&cx, |this, _| {
730 this.as_local().unwrap().config.collaborators.clone()
731 });
732 let response = rpc
733 .request(proto::OpenWorktree {
734 root_name: root_name.clone(),
735 collaborator_logins,
736 })
737 .await?;
738
739 Some(response.worktree_id)
740 } else {
741 None
742 };
743 if remote_id_tx.send(remote_id).await.is_err() {
744 break;
745 }
746 }
747 }
748 Ok(())
749 }
750 .log_err()
751 }
752 });
753
754 let tree = Self {
755 snapshot: snapshot.clone(),
756 config,
757 remote_id: remote_id_rx,
758 background_snapshot: Arc::new(Mutex::new(snapshot)),
759 last_scan_state_rx,
760 _background_scanner_task: None,
761 _maintain_remote_id_task,
762 share: None,
763 poll_task: None,
764 open_buffers: Default::default(),
765 shared_buffers: Default::default(),
766 queued_operations: Default::default(),
767 peers: Default::default(),
768 languages,
769 rpc,
770 fs,
771 };
772
773 cx.spawn_weak(|this, mut cx| async move {
774 while let Ok(scan_state) = scan_states_rx.recv().await {
775 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
776 let to_send = handle.update(&mut cx, |this, cx| {
777 last_scan_state_tx.blocking_send(scan_state).ok();
778 this.poll_snapshot(cx);
779 let tree = this.as_local_mut().unwrap();
780 if !tree.is_scanning() {
781 if let Some(share) = tree.share.as_ref() {
782 return Some((tree.snapshot(), share.snapshots_tx.clone()));
783 }
784 }
785 None
786 });
787
788 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
789 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
790 log::error!("error submitting snapshot to send {}", err);
791 }
792 }
793 } else {
794 break;
795 }
796 }
797 })
798 .detach();
799
800 Worktree::Local(tree)
801 });
802
803 Ok((tree, scan_states_tx))
804 }
805
806 pub fn open_buffer(
807 &mut self,
808 path: &Path,
809 cx: &mut ModelContext<Worktree>,
810 ) -> Task<Result<ModelHandle<Buffer>>> {
811 let handle = cx.handle();
812
813 // If there is already a buffer for the given path, then return it.
814 let mut existing_buffer = None;
815 self.open_buffers.retain(|_buffer_id, buffer| {
816 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
817 if let Some(file) = buffer.read(cx.as_ref()).file() {
818 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
819 existing_buffer = Some(buffer);
820 }
821 }
822 true
823 } else {
824 false
825 }
826 });
827
828 let languages = self.languages.clone();
829 let path = Arc::from(path);
830 cx.spawn(|this, mut cx| async move {
831 if let Some(existing_buffer) = existing_buffer {
832 Ok(existing_buffer)
833 } else {
834 let (file, contents) = this
835 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
836 .await?;
837 let language = languages.select_language(&path).cloned();
838 let buffer = cx.add_model(|cx| {
839 Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
840 });
841 this.update(&mut cx, |this, _| {
842 let this = this
843 .as_local_mut()
844 .ok_or_else(|| anyhow!("must be a local worktree"))?;
845 this.open_buffers.insert(buffer.id(), buffer.downgrade());
846 Ok(buffer)
847 })
848 }
849 })
850 }
851
852 pub fn open_remote_buffer(
853 &mut self,
854 envelope: TypedEnvelope<proto::OpenBuffer>,
855 cx: &mut ModelContext<Worktree>,
856 ) -> Task<Result<proto::OpenBufferResponse>> {
857 let peer_id = envelope.original_sender_id();
858 let path = Path::new(&envelope.payload.path);
859
860 let buffer = self.open_buffer(path, cx);
861
862 cx.spawn(|this, mut cx| async move {
863 let buffer = buffer.await?;
864 this.update(&mut cx, |this, cx| {
865 this.as_local_mut()
866 .unwrap()
867 .shared_buffers
868 .entry(peer_id?)
869 .or_default()
870 .insert(buffer.id() as u64, buffer.clone());
871
872 Ok(proto::OpenBufferResponse {
873 buffer: Some(buffer.update(cx.as_mut(), |buffer, cx| buffer.to_proto(cx))),
874 })
875 })
876 })
877 }
878
879 pub fn close_remote_buffer(
880 &mut self,
881 envelope: TypedEnvelope<proto::CloseBuffer>,
882 cx: &mut ModelContext<Worktree>,
883 ) -> Result<()> {
884 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
885 shared_buffers.remove(&envelope.payload.buffer_id);
886 cx.notify();
887 }
888
889 Ok(())
890 }
891
892 pub fn add_peer(
893 &mut self,
894 envelope: TypedEnvelope<proto::AddPeer>,
895 cx: &mut ModelContext<Worktree>,
896 ) -> Result<()> {
897 let peer = envelope
898 .payload
899 .peer
900 .as_ref()
901 .ok_or_else(|| anyhow!("empty peer"))?;
902 self.peers
903 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
904 cx.notify();
905
906 Ok(())
907 }
908
909 pub fn remove_peer(
910 &mut self,
911 envelope: TypedEnvelope<proto::RemovePeer>,
912 cx: &mut ModelContext<Worktree>,
913 ) -> Result<()> {
914 let peer_id = PeerId(envelope.payload.peer_id);
915 let replica_id = self
916 .peers
917 .remove(&peer_id)
918 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
919 self.shared_buffers.remove(&peer_id);
920 for (_, buffer) in &self.open_buffers {
921 if let Some(buffer) = buffer.upgrade(cx) {
922 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
923 }
924 }
925 cx.notify();
926
927 Ok(())
928 }
929
930 pub fn scan_complete(&self) -> impl Future<Output = ()> {
931 let mut scan_state_rx = self.last_scan_state_rx.clone();
932 async move {
933 let mut scan_state = Some(scan_state_rx.borrow().clone());
934 while let Some(ScanState::Scanning) = scan_state {
935 scan_state = scan_state_rx.recv().await;
936 }
937 }
938 }
939
940 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
941 let mut remote_id = self.remote_id.clone();
942 async move {
943 while let Some(remote_id) = remote_id.recv().await {
944 if remote_id.is_some() {
945 return remote_id;
946 }
947 }
948 None
949 }
950 }
951
952 fn is_scanning(&self) -> bool {
953 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
954 true
955 } else {
956 false
957 }
958 }
959
960 pub fn snapshot(&self) -> Snapshot {
961 self.snapshot.clone()
962 }
963
964 pub fn abs_path(&self) -> &Path {
965 self.snapshot.abs_path.as_ref()
966 }
967
968 pub fn contains_abs_path(&self, path: &Path) -> bool {
969 path.starts_with(&self.snapshot.abs_path)
970 }
971
972 fn absolutize(&self, path: &Path) -> PathBuf {
973 if path.file_name().is_some() {
974 self.snapshot.abs_path.join(path)
975 } else {
976 self.snapshot.abs_path.to_path_buf()
977 }
978 }
979
980 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
981 let handle = cx.handle();
982 let path = Arc::from(path);
983 let abs_path = self.absolutize(&path);
984 let background_snapshot = self.background_snapshot.clone();
985 let fs = self.fs.clone();
986 cx.spawn(|this, mut cx| async move {
987 let text = fs.load(&abs_path).await?;
988 // Eagerly populate the snapshot with an updated entry for the loaded file
989 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
990 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
991 Ok((File::new(entry.id, handle, entry.path, entry.mtime), text))
992 })
993 }
994
995 pub fn save_buffer_as(
996 &self,
997 buffer: ModelHandle<Buffer>,
998 path: impl Into<Arc<Path>>,
999 text: Rope,
1000 cx: &mut ModelContext<Worktree>,
1001 ) -> Task<Result<File>> {
1002 let save = self.save(path, text, cx);
1003 cx.spawn(|this, mut cx| async move {
1004 let entry = save.await?;
1005 this.update(&mut cx, |this, cx| {
1006 this.as_local_mut()
1007 .unwrap()
1008 .open_buffers
1009 .insert(buffer.id(), buffer.downgrade());
1010 Ok(File::new(entry.id, cx.handle(), entry.path, entry.mtime))
1011 })
1012 })
1013 }
1014
1015 fn save(
1016 &self,
1017 path: impl Into<Arc<Path>>,
1018 text: Rope,
1019 cx: &mut ModelContext<Worktree>,
1020 ) -> Task<Result<Entry>> {
1021 let path = path.into();
1022 let abs_path = self.absolutize(&path);
1023 let background_snapshot = self.background_snapshot.clone();
1024 let fs = self.fs.clone();
1025 let save = cx.background().spawn(async move {
1026 fs.save(&abs_path, &text).await?;
1027 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1028 });
1029
1030 cx.spawn(|this, mut cx| async move {
1031 let entry = save.await?;
1032 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1033 Ok(entry)
1034 })
1035 }
1036
1037 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1038 let snapshot = self.snapshot();
1039 let share_request = self.share_request(cx);
1040 let rpc = self.rpc.clone();
1041 cx.spawn(|this, mut cx| async move {
1042 let share_request = if let Some(request) = share_request.await {
1043 request
1044 } else {
1045 return Err(anyhow!("failed to open worktree on the server"));
1046 };
1047
1048 let remote_id = share_request.worktree.as_ref().unwrap().id;
1049 let share_response = rpc.request(share_request).await?;
1050
1051 log::info!("sharing worktree {:?}", share_response);
1052 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1053 smol::channel::unbounded::<Snapshot>();
1054
1055 cx.background()
1056 .spawn({
1057 let rpc = rpc.clone();
1058 async move {
1059 let mut prev_snapshot = snapshot;
1060 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1061 let message = snapshot.build_update(&prev_snapshot, remote_id);
1062 match rpc.send(message).await {
1063 Ok(()) => prev_snapshot = snapshot,
1064 Err(err) => log::error!("error sending snapshot diff {}", err),
1065 }
1066 }
1067 }
1068 })
1069 .detach();
1070
1071 this.update(&mut cx, |worktree, cx| {
1072 let _subscriptions = vec![
1073 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer),
1074 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer),
1075 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer),
1076 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer),
1077 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer),
1078 rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer),
1079 ];
1080
1081 let worktree = worktree.as_local_mut().unwrap();
1082 worktree.share = Some(ShareState {
1083 snapshots_tx: snapshots_to_send_tx,
1084 _subscriptions,
1085 });
1086 });
1087
1088 Ok(remote_id)
1089 })
1090 }
1091
1092 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1093 let remote_id = self.next_remote_id();
1094 let snapshot = self.snapshot();
1095 let root_name = self.root_name.clone();
1096 cx.background().spawn(async move {
1097 remote_id.await.map(|id| {
1098 let entries = snapshot
1099 .entries_by_path
1100 .cursor::<(), ()>()
1101 .map(Into::into)
1102 .collect();
1103 proto::ShareWorktree {
1104 worktree: Some(proto::Worktree {
1105 id,
1106 root_name,
1107 entries,
1108 }),
1109 }
1110 })
1111 })
1112 }
1113}
1114
1115pub fn refresh_buffer(abs_path: PathBuf, fs: &Arc<dyn Fs>, cx: &mut ModelContext<Buffer>) {
1116 let fs = fs.clone();
1117 cx.spawn(|buffer, mut cx| async move {
1118 let new_text = fs.load(&abs_path).await;
1119 match new_text {
1120 Err(error) => log::error!("error refreshing buffer after file changed: {}", error),
1121 Ok(new_text) => {
1122 buffer
1123 .update(&mut cx, |buffer, cx| {
1124 buffer.set_text_from_disk(new_text.into(), cx)
1125 })
1126 .await;
1127 }
1128 }
1129 })
1130 .detach()
1131}
1132
1133impl Deref for LocalWorktree {
1134 type Target = Snapshot;
1135
1136 fn deref(&self) -> &Self::Target {
1137 &self.snapshot
1138 }
1139}
1140
1141impl fmt::Debug for LocalWorktree {
1142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1143 self.snapshot.fmt(f)
1144 }
1145}
1146
1147struct ShareState {
1148 snapshots_tx: Sender<Snapshot>,
1149 _subscriptions: Vec<rpc::Subscription>,
1150}
1151
1152pub struct RemoteWorktree {
1153 remote_id: u64,
1154 snapshot: Snapshot,
1155 snapshot_rx: watch::Receiver<Snapshot>,
1156 rpc: Arc<rpc::Client>,
1157 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1158 replica_id: ReplicaId,
1159 open_buffers: HashMap<usize, RemoteBuffer>,
1160 peers: HashMap<PeerId, ReplicaId>,
1161 languages: Arc<LanguageRegistry>,
1162 queued_operations: Vec<(u64, Operation)>,
1163 _subscriptions: Vec<rpc::Subscription>,
1164}
1165
1166impl RemoteWorktree {
1167 pub fn open_buffer(
1168 &mut self,
1169 path: &Path,
1170 cx: &mut ModelContext<Worktree>,
1171 ) -> Task<Result<ModelHandle<Buffer>>> {
1172 let handle = cx.handle();
1173 let mut existing_buffer = None;
1174 self.open_buffers.retain(|_buffer_id, buffer| {
1175 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1176 if let Some(file) = buffer.read(cx.as_ref()).file() {
1177 if file.worktree_id() == handle.id() && file.path.as_ref() == path {
1178 existing_buffer = Some(buffer);
1179 }
1180 }
1181 true
1182 } else {
1183 false
1184 }
1185 });
1186
1187 let rpc = self.rpc.clone();
1188 let languages = self.languages.clone();
1189 let replica_id = self.replica_id;
1190 let remote_worktree_id = self.remote_id;
1191 let path = path.to_string_lossy().to_string();
1192 cx.spawn(|this, mut cx| async move {
1193 if let Some(existing_buffer) = existing_buffer {
1194 Ok(existing_buffer)
1195 } else {
1196 let entry = this
1197 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1198 .ok_or_else(|| anyhow!("file does not exist"))?;
1199 let file = File::new(entry.id, handle, entry.path, entry.mtime);
1200 let language = languages.select_language(&path).cloned();
1201 let response = rpc
1202 .request(proto::OpenBuffer {
1203 worktree_id: remote_worktree_id as u64,
1204 path,
1205 })
1206 .await?;
1207 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1208 let buffer_id = remote_buffer.id as usize;
1209 let buffer = cx.add_model(|cx| {
1210 Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
1211 });
1212 this.update(&mut cx, |this, cx| {
1213 let this = this.as_remote_mut().unwrap();
1214 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1215 .open_buffers
1216 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1217 {
1218 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1219 }
1220 Result::<_, anyhow::Error>::Ok(())
1221 })?;
1222 Ok(buffer)
1223 }
1224 })
1225 }
1226
1227 fn snapshot(&self) -> Snapshot {
1228 self.snapshot.clone()
1229 }
1230
1231 fn update_from_remote(
1232 &mut self,
1233 envelope: TypedEnvelope<proto::UpdateWorktree>,
1234 cx: &mut ModelContext<Worktree>,
1235 ) -> Result<()> {
1236 let mut tx = self.updates_tx.clone();
1237 let payload = envelope.payload.clone();
1238 cx.background()
1239 .spawn(async move {
1240 tx.send(payload).await.expect("receiver runs to completion");
1241 })
1242 .detach();
1243
1244 Ok(())
1245 }
1246
1247 pub fn add_peer(
1248 &mut self,
1249 envelope: TypedEnvelope<proto::AddPeer>,
1250 cx: &mut ModelContext<Worktree>,
1251 ) -> Result<()> {
1252 let peer = envelope
1253 .payload
1254 .peer
1255 .as_ref()
1256 .ok_or_else(|| anyhow!("empty peer"))?;
1257 self.peers
1258 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1259 cx.notify();
1260 Ok(())
1261 }
1262
1263 pub fn remove_peer(
1264 &mut self,
1265 envelope: TypedEnvelope<proto::RemovePeer>,
1266 cx: &mut ModelContext<Worktree>,
1267 ) -> Result<()> {
1268 let peer_id = PeerId(envelope.payload.peer_id);
1269 let replica_id = self
1270 .peers
1271 .remove(&peer_id)
1272 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1273 for (_, buffer) in &self.open_buffers {
1274 if let Some(buffer) = buffer.upgrade(cx) {
1275 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1276 }
1277 }
1278 cx.notify();
1279 Ok(())
1280 }
1281}
1282
1283enum RemoteBuffer {
1284 Operations(Vec<Operation>),
1285 Loaded(WeakModelHandle<Buffer>),
1286}
1287
1288impl RemoteBuffer {
1289 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1290 match self {
1291 Self::Operations(_) => None,
1292 Self::Loaded(buffer) => buffer.upgrade(cx),
1293 }
1294 }
1295}
1296
1297#[derive(Clone)]
1298pub struct Snapshot {
1299 id: usize,
1300 scan_id: usize,
1301 abs_path: Arc<Path>,
1302 root_name: String,
1303 root_char_bag: CharBag,
1304 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1305 entries_by_path: SumTree<Entry>,
1306 entries_by_id: SumTree<PathEntry>,
1307 removed_entry_ids: HashMap<u64, usize>,
1308 next_entry_id: Arc<AtomicUsize>,
1309}
1310
1311impl Snapshot {
1312 pub fn id(&self) -> usize {
1313 self.id
1314 }
1315
1316 pub fn build_update(&self, other: &Self, worktree_id: u64) -> proto::UpdateWorktree {
1317 let mut updated_entries = Vec::new();
1318 let mut removed_entries = Vec::new();
1319 let mut self_entries = self.entries_by_id.cursor::<(), ()>().peekable();
1320 let mut other_entries = other.entries_by_id.cursor::<(), ()>().peekable();
1321 loop {
1322 match (self_entries.peek(), other_entries.peek()) {
1323 (Some(self_entry), Some(other_entry)) => match self_entry.id.cmp(&other_entry.id) {
1324 Ordering::Less => {
1325 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1326 updated_entries.push(entry);
1327 self_entries.next();
1328 }
1329 Ordering::Equal => {
1330 if self_entry.scan_id != other_entry.scan_id {
1331 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1332 updated_entries.push(entry);
1333 }
1334
1335 self_entries.next();
1336 other_entries.next();
1337 }
1338 Ordering::Greater => {
1339 removed_entries.push(other_entry.id as u64);
1340 other_entries.next();
1341 }
1342 },
1343 (Some(self_entry), None) => {
1344 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1345 updated_entries.push(entry);
1346 self_entries.next();
1347 }
1348 (None, Some(other_entry)) => {
1349 removed_entries.push(other_entry.id as u64);
1350 other_entries.next();
1351 }
1352 (None, None) => break,
1353 }
1354 }
1355
1356 proto::UpdateWorktree {
1357 updated_entries,
1358 removed_entries,
1359 worktree_id,
1360 }
1361 }
1362
1363 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1364 self.scan_id += 1;
1365 let scan_id = self.scan_id;
1366
1367 let mut entries_by_path_edits = Vec::new();
1368 let mut entries_by_id_edits = Vec::new();
1369 for entry_id in update.removed_entries {
1370 let entry_id = entry_id as usize;
1371 let entry = self
1372 .entry_for_id(entry_id)
1373 .ok_or_else(|| anyhow!("unknown entry"))?;
1374 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1375 entries_by_id_edits.push(Edit::Remove(entry.id));
1376 }
1377
1378 for entry in update.updated_entries {
1379 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1380 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1381 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1382 }
1383 entries_by_id_edits.push(Edit::Insert(PathEntry {
1384 id: entry.id,
1385 path: entry.path.clone(),
1386 scan_id,
1387 }));
1388 entries_by_path_edits.push(Edit::Insert(entry));
1389 }
1390
1391 self.entries_by_path.edit(entries_by_path_edits, &());
1392 self.entries_by_id.edit(entries_by_id_edits, &());
1393
1394 Ok(())
1395 }
1396
1397 pub fn file_count(&self) -> usize {
1398 self.entries_by_path.summary().file_count
1399 }
1400
1401 pub fn visible_file_count(&self) -> usize {
1402 self.entries_by_path.summary().visible_file_count
1403 }
1404
1405 pub fn files(&self, start: usize) -> FileIter {
1406 FileIter::all(self, start)
1407 }
1408
1409 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1410 let empty_path = Path::new("");
1411 self.entries_by_path
1412 .cursor::<(), ()>()
1413 .filter(move |entry| entry.path.as_ref() != empty_path)
1414 .map(|entry| &entry.path)
1415 }
1416
1417 pub fn visible_files(&self, start: usize) -> FileIter {
1418 FileIter::visible(self, start)
1419 }
1420
1421 fn child_entries<'a>(&'a self, path: &'a Path) -> ChildEntriesIter<'a> {
1422 ChildEntriesIter::new(path, self)
1423 }
1424
1425 pub fn root_entry(&self) -> Option<&Entry> {
1426 self.entry_for_path("")
1427 }
1428
1429 pub fn root_name(&self) -> &str {
1430 &self.root_name
1431 }
1432
1433 fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1434 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1435 if cursor.seek(&PathSearch::Exact(path.as_ref()), Bias::Left, &()) {
1436 cursor.item()
1437 } else {
1438 None
1439 }
1440 }
1441
1442 fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1443 let entry = self.entries_by_id.get(&id, &())?;
1444 self.entry_for_path(&entry.path)
1445 }
1446
1447 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1448 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1449 }
1450
1451 fn insert_entry(&mut self, mut entry: Entry) -> Entry {
1452 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1453 let (ignore, err) = Gitignore::new(self.abs_path.join(&entry.path));
1454 if let Some(err) = err {
1455 log::error!("error in ignore file {:?} - {:?}", &entry.path, err);
1456 }
1457
1458 let ignore_dir_path = entry.path.parent().unwrap();
1459 self.ignores
1460 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1461 }
1462
1463 self.reuse_entry_id(&mut entry);
1464 self.entries_by_path.insert_or_replace(entry.clone(), &());
1465 self.entries_by_id.insert_or_replace(
1466 PathEntry {
1467 id: entry.id,
1468 path: entry.path.clone(),
1469 scan_id: self.scan_id,
1470 },
1471 &(),
1472 );
1473 entry
1474 }
1475
1476 fn populate_dir(
1477 &mut self,
1478 parent_path: Arc<Path>,
1479 entries: impl IntoIterator<Item = Entry>,
1480 ignore: Option<Arc<Gitignore>>,
1481 ) {
1482 let mut parent_entry = self
1483 .entries_by_path
1484 .get(&PathKey(parent_path.clone()), &())
1485 .unwrap()
1486 .clone();
1487 if let Some(ignore) = ignore {
1488 self.ignores.insert(parent_path, (ignore, self.scan_id));
1489 }
1490 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1491 parent_entry.kind = EntryKind::Dir;
1492 } else {
1493 unreachable!();
1494 }
1495
1496 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1497 let mut entries_by_id_edits = Vec::new();
1498
1499 for mut entry in entries {
1500 self.reuse_entry_id(&mut entry);
1501 entries_by_id_edits.push(Edit::Insert(PathEntry {
1502 id: entry.id,
1503 path: entry.path.clone(),
1504 scan_id: self.scan_id,
1505 }));
1506 entries_by_path_edits.push(Edit::Insert(entry));
1507 }
1508
1509 self.entries_by_path.edit(entries_by_path_edits, &());
1510 self.entries_by_id.edit(entries_by_id_edits, &());
1511 }
1512
1513 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1514 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1515 entry.id = removed_entry_id;
1516 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1517 entry.id = existing_entry.id;
1518 }
1519 }
1520
1521 fn remove_path(&mut self, path: &Path) {
1522 let mut new_entries;
1523 let removed_entry_ids;
1524 {
1525 let mut cursor = self.entries_by_path.cursor::<_, ()>();
1526 new_entries = cursor.slice(&PathSearch::Exact(path), Bias::Left, &());
1527 removed_entry_ids = cursor.slice(&PathSearch::Successor(path), Bias::Left, &());
1528 new_entries.push_tree(cursor.suffix(&()), &());
1529 }
1530 self.entries_by_path = new_entries;
1531
1532 let mut entries_by_id_edits = Vec::new();
1533 for entry in removed_entry_ids.cursor::<(), ()>() {
1534 let removed_entry_id = self
1535 .removed_entry_ids
1536 .entry(entry.inode)
1537 .or_insert(entry.id);
1538 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1539 entries_by_id_edits.push(Edit::Remove(entry.id));
1540 }
1541 self.entries_by_id.edit(entries_by_id_edits, &());
1542
1543 if path.file_name() == Some(&GITIGNORE) {
1544 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1545 *scan_id = self.scan_id;
1546 }
1547 }
1548 }
1549
1550 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1551 let mut new_ignores = Vec::new();
1552 for ancestor in path.ancestors().skip(1) {
1553 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1554 new_ignores.push((ancestor, Some(ignore.clone())));
1555 } else {
1556 new_ignores.push((ancestor, None));
1557 }
1558 }
1559
1560 let mut ignore_stack = IgnoreStack::none();
1561 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1562 if ignore_stack.is_path_ignored(&parent_path, true) {
1563 ignore_stack = IgnoreStack::all();
1564 break;
1565 } else if let Some(ignore) = ignore {
1566 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1567 }
1568 }
1569
1570 if ignore_stack.is_path_ignored(path, is_dir) {
1571 ignore_stack = IgnoreStack::all();
1572 }
1573
1574 ignore_stack
1575 }
1576}
1577
1578impl fmt::Debug for Snapshot {
1579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1580 for entry in self.entries_by_path.cursor::<(), ()>() {
1581 for _ in entry.path.ancestors().skip(1) {
1582 write!(f, " ")?;
1583 }
1584 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1585 }
1586 Ok(())
1587 }
1588}
1589
1590#[derive(Clone, PartialEq)]
1591pub struct File {
1592 entry_id: Option<usize>,
1593 worktree: ModelHandle<Worktree>,
1594 pub path: Arc<Path>,
1595 pub mtime: SystemTime,
1596}
1597
1598impl File {
1599 pub fn new(
1600 entry_id: usize,
1601 worktree: ModelHandle<Worktree>,
1602 path: Arc<Path>,
1603 mtime: SystemTime,
1604 ) -> Self {
1605 Self {
1606 entry_id: Some(entry_id),
1607 worktree,
1608 path,
1609 mtime,
1610 }
1611 }
1612
1613 pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
1614 self.worktree.update(cx, |worktree, cx| {
1615 if let Some((rpc, remote_id)) = match worktree {
1616 Worktree::Local(worktree) => worktree
1617 .remote_id
1618 .borrow()
1619 .map(|id| (worktree.rpc.clone(), id)),
1620 Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
1621 } {
1622 cx.spawn(|worktree, mut cx| async move {
1623 if let Err(error) = rpc
1624 .request(proto::UpdateBuffer {
1625 worktree_id: remote_id,
1626 buffer_id,
1627 operations: vec![(&operation).into()],
1628 })
1629 .await
1630 {
1631 worktree.update(&mut cx, |worktree, _| {
1632 log::error!("error sending buffer operation: {}", error);
1633 match worktree {
1634 Worktree::Local(t) => &mut t.queued_operations,
1635 Worktree::Remote(t) => &mut t.queued_operations,
1636 }
1637 .push((buffer_id, operation));
1638 });
1639 }
1640 })
1641 .detach();
1642 }
1643 });
1644 }
1645
1646 pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
1647 self.worktree.update(cx, |worktree, cx| {
1648 if let Worktree::Remote(worktree) = worktree {
1649 let worktree_id = worktree.remote_id;
1650 let rpc = worktree.rpc.clone();
1651 cx.background()
1652 .spawn(async move {
1653 if let Err(error) = rpc
1654 .send(proto::CloseBuffer {
1655 worktree_id,
1656 buffer_id,
1657 })
1658 .await
1659 {
1660 log::error!("error closing remote buffer: {}", error);
1661 }
1662 })
1663 .detach();
1664 }
1665 });
1666 }
1667
1668 /// Returns this file's path relative to the root of its worktree.
1669 pub fn path(&self) -> Arc<Path> {
1670 self.path.clone()
1671 }
1672
1673 pub fn abs_path(&self, cx: &AppContext) -> PathBuf {
1674 self.worktree.read(cx).abs_path.join(&self.path)
1675 }
1676
1677 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1678 /// of its worktree, then this method will return the name of the worktree itself.
1679 pub fn file_name<'a>(&'a self, cx: &'a AppContext) -> Option<OsString> {
1680 self.path
1681 .file_name()
1682 .or_else(|| Some(OsStr::new(self.worktree.read(cx).root_name())))
1683 .map(Into::into)
1684 }
1685
1686 pub fn is_deleted(&self) -> bool {
1687 self.entry_id.is_none()
1688 }
1689
1690 pub fn exists(&self) -> bool {
1691 !self.is_deleted()
1692 }
1693
1694 pub fn save(
1695 &self,
1696 buffer_id: u64,
1697 text: Rope,
1698 version: time::Global,
1699 cx: &mut MutableAppContext,
1700 ) -> Task<Result<(time::Global, SystemTime)>> {
1701 self.worktree.update(cx, |worktree, cx| match worktree {
1702 Worktree::Local(worktree) => {
1703 let rpc = worktree.rpc.clone();
1704 let worktree_id = *worktree.remote_id.borrow();
1705 let save = worktree.save(self.path.clone(), text, cx);
1706 cx.background().spawn(async move {
1707 let entry = save.await?;
1708 if let Some(worktree_id) = worktree_id {
1709 rpc.send(proto::BufferSaved {
1710 worktree_id,
1711 buffer_id,
1712 version: (&version).into(),
1713 mtime: Some(entry.mtime.into()),
1714 })
1715 .await?;
1716 }
1717 Ok((version, entry.mtime))
1718 })
1719 }
1720 Worktree::Remote(worktree) => {
1721 let rpc = worktree.rpc.clone();
1722 let worktree_id = worktree.remote_id;
1723 cx.foreground().spawn(async move {
1724 let response = rpc
1725 .request(proto::SaveBuffer {
1726 worktree_id,
1727 buffer_id,
1728 })
1729 .await?;
1730 let version = response.version.try_into()?;
1731 let mtime = response
1732 .mtime
1733 .ok_or_else(|| anyhow!("missing mtime"))?
1734 .into();
1735 Ok((version, mtime))
1736 })
1737 }
1738 })
1739 }
1740
1741 pub fn worktree_id(&self) -> usize {
1742 self.worktree.id()
1743 }
1744
1745 pub fn entry_id(&self) -> (usize, Arc<Path>) {
1746 (self.worktree.id(), self.path.clone())
1747 }
1748}
1749
1750#[derive(Clone, Debug)]
1751pub struct Entry {
1752 pub id: usize,
1753 pub kind: EntryKind,
1754 pub path: Arc<Path>,
1755 pub inode: u64,
1756 pub mtime: SystemTime,
1757 pub is_symlink: bool,
1758 pub is_ignored: bool,
1759}
1760
1761#[derive(Clone, Debug)]
1762pub enum EntryKind {
1763 PendingDir,
1764 Dir,
1765 File(CharBag),
1766}
1767
1768impl Entry {
1769 fn new(
1770 path: Arc<Path>,
1771 metadata: &fs::Metadata,
1772 next_entry_id: &AtomicUsize,
1773 root_char_bag: CharBag,
1774 ) -> Self {
1775 Self {
1776 id: next_entry_id.fetch_add(1, SeqCst),
1777 kind: if metadata.is_dir {
1778 EntryKind::PendingDir
1779 } else {
1780 EntryKind::File(char_bag_for_path(root_char_bag, &path))
1781 },
1782 path,
1783 inode: metadata.inode,
1784 mtime: metadata.mtime,
1785 is_symlink: metadata.is_symlink,
1786 is_ignored: false,
1787 }
1788 }
1789
1790 pub fn is_dir(&self) -> bool {
1791 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
1792 }
1793
1794 pub fn is_file(&self) -> bool {
1795 matches!(self.kind, EntryKind::File(_))
1796 }
1797}
1798
1799impl sum_tree::Item for Entry {
1800 type Summary = EntrySummary;
1801
1802 fn summary(&self) -> Self::Summary {
1803 let file_count;
1804 let visible_file_count;
1805 if self.is_file() {
1806 file_count = 1;
1807 if self.is_ignored {
1808 visible_file_count = 0;
1809 } else {
1810 visible_file_count = 1;
1811 }
1812 } else {
1813 file_count = 0;
1814 visible_file_count = 0;
1815 }
1816
1817 EntrySummary {
1818 max_path: self.path.clone(),
1819 file_count,
1820 visible_file_count,
1821 }
1822 }
1823}
1824
1825impl sum_tree::KeyedItem for Entry {
1826 type Key = PathKey;
1827
1828 fn key(&self) -> Self::Key {
1829 PathKey(self.path.clone())
1830 }
1831}
1832
1833#[derive(Clone, Debug)]
1834pub struct EntrySummary {
1835 max_path: Arc<Path>,
1836 file_count: usize,
1837 visible_file_count: usize,
1838}
1839
1840impl Default for EntrySummary {
1841 fn default() -> Self {
1842 Self {
1843 max_path: Arc::from(Path::new("")),
1844 file_count: 0,
1845 visible_file_count: 0,
1846 }
1847 }
1848}
1849
1850impl sum_tree::Summary for EntrySummary {
1851 type Context = ();
1852
1853 fn add_summary(&mut self, rhs: &Self, _: &()) {
1854 self.max_path = rhs.max_path.clone();
1855 self.file_count += rhs.file_count;
1856 self.visible_file_count += rhs.visible_file_count;
1857 }
1858}
1859
1860#[derive(Clone, Debug)]
1861struct PathEntry {
1862 id: usize,
1863 path: Arc<Path>,
1864 scan_id: usize,
1865}
1866
1867impl sum_tree::Item for PathEntry {
1868 type Summary = PathEntrySummary;
1869
1870 fn summary(&self) -> Self::Summary {
1871 PathEntrySummary { max_id: self.id }
1872 }
1873}
1874
1875impl sum_tree::KeyedItem for PathEntry {
1876 type Key = usize;
1877
1878 fn key(&self) -> Self::Key {
1879 self.id
1880 }
1881}
1882
1883#[derive(Clone, Debug, Default)]
1884struct PathEntrySummary {
1885 max_id: usize,
1886}
1887
1888impl sum_tree::Summary for PathEntrySummary {
1889 type Context = ();
1890
1891 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
1892 self.max_id = summary.max_id;
1893 }
1894}
1895
1896impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
1897 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
1898 *self = summary.max_id;
1899 }
1900}
1901
1902#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
1903pub struct PathKey(Arc<Path>);
1904
1905impl Default for PathKey {
1906 fn default() -> Self {
1907 Self(Path::new("").into())
1908 }
1909}
1910
1911impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
1912 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1913 self.0 = summary.max_path.clone();
1914 }
1915}
1916
1917#[derive(Copy, Clone, Debug, PartialEq, Eq)]
1918enum PathSearch<'a> {
1919 Exact(&'a Path),
1920 Successor(&'a Path),
1921}
1922
1923impl<'a> Ord for PathSearch<'a> {
1924 fn cmp(&self, other: &Self) -> cmp::Ordering {
1925 match (self, other) {
1926 (Self::Exact(a), Self::Exact(b)) => a.cmp(b),
1927 (Self::Successor(a), Self::Exact(b)) => {
1928 if b.starts_with(a) {
1929 cmp::Ordering::Greater
1930 } else {
1931 a.cmp(b)
1932 }
1933 }
1934 _ => unreachable!("not sure we need the other two cases"),
1935 }
1936 }
1937}
1938
1939impl<'a> PartialOrd for PathSearch<'a> {
1940 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1941 Some(self.cmp(other))
1942 }
1943}
1944
1945impl<'a> Default for PathSearch<'a> {
1946 fn default() -> Self {
1947 Self::Exact(Path::new("").into())
1948 }
1949}
1950
1951impl<'a: 'b, 'b> sum_tree::Dimension<'a, EntrySummary> for PathSearch<'b> {
1952 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1953 *self = Self::Exact(summary.max_path.as_ref());
1954 }
1955}
1956
1957#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1958pub struct FileCount(usize);
1959
1960impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
1961 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1962 self.0 += summary.file_count;
1963 }
1964}
1965
1966#[derive(Copy, Clone, Default, Debug, Eq, PartialEq, Ord, PartialOrd)]
1967pub struct VisibleFileCount(usize);
1968
1969impl<'a> sum_tree::Dimension<'a, EntrySummary> for VisibleFileCount {
1970 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
1971 self.0 += summary.visible_file_count;
1972 }
1973}
1974
1975struct BackgroundScanner {
1976 fs: Arc<dyn Fs>,
1977 snapshot: Arc<Mutex<Snapshot>>,
1978 notify: Sender<ScanState>,
1979 executor: Arc<executor::Background>,
1980}
1981
1982impl BackgroundScanner {
1983 fn new(
1984 snapshot: Arc<Mutex<Snapshot>>,
1985 notify: Sender<ScanState>,
1986 fs: Arc<dyn Fs>,
1987 executor: Arc<executor::Background>,
1988 ) -> Self {
1989 Self {
1990 fs,
1991 snapshot,
1992 notify,
1993 executor,
1994 }
1995 }
1996
1997 fn abs_path(&self) -> Arc<Path> {
1998 self.snapshot.lock().abs_path.clone()
1999 }
2000
2001 fn snapshot(&self) -> Snapshot {
2002 self.snapshot.lock().clone()
2003 }
2004
2005 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2006 if self.notify.send(ScanState::Scanning).await.is_err() {
2007 return;
2008 }
2009
2010 if let Err(err) = self.scan_dirs().await {
2011 if self
2012 .notify
2013 .send(ScanState::Err(Arc::new(err)))
2014 .await
2015 .is_err()
2016 {
2017 return;
2018 }
2019 }
2020
2021 if self.notify.send(ScanState::Idle).await.is_err() {
2022 return;
2023 }
2024
2025 futures::pin_mut!(events_rx);
2026 while let Some(events) = events_rx.next().await {
2027 if self.notify.send(ScanState::Scanning).await.is_err() {
2028 break;
2029 }
2030
2031 if !self.process_events(events).await {
2032 break;
2033 }
2034
2035 if self.notify.send(ScanState::Idle).await.is_err() {
2036 break;
2037 }
2038 }
2039 }
2040
2041 async fn scan_dirs(&mut self) -> Result<()> {
2042 let root_char_bag;
2043 let next_entry_id;
2044 let is_dir;
2045 {
2046 let snapshot = self.snapshot.lock();
2047 root_char_bag = snapshot.root_char_bag;
2048 next_entry_id = snapshot.next_entry_id.clone();
2049 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2050 };
2051
2052 if is_dir {
2053 let path: Arc<Path> = Arc::from(Path::new(""));
2054 let abs_path = self.abs_path();
2055 let (tx, rx) = channel::unbounded();
2056 tx.send(ScanJob {
2057 abs_path: abs_path.to_path_buf(),
2058 path,
2059 ignore_stack: IgnoreStack::none(),
2060 scan_queue: tx.clone(),
2061 })
2062 .await
2063 .unwrap();
2064 drop(tx);
2065
2066 self.executor
2067 .scoped(|scope| {
2068 for _ in 0..self.executor.num_cpus() {
2069 scope.spawn(async {
2070 while let Ok(job) = rx.recv().await {
2071 if let Err(err) = self
2072 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2073 .await
2074 {
2075 log::error!("error scanning {:?}: {}", job.abs_path, err);
2076 }
2077 }
2078 });
2079 }
2080 })
2081 .await;
2082 }
2083
2084 Ok(())
2085 }
2086
2087 async fn scan_dir(
2088 &self,
2089 root_char_bag: CharBag,
2090 next_entry_id: Arc<AtomicUsize>,
2091 job: &ScanJob,
2092 ) -> Result<()> {
2093 let mut new_entries: Vec<Entry> = Vec::new();
2094 let mut new_jobs: Vec<ScanJob> = Vec::new();
2095 let mut ignore_stack = job.ignore_stack.clone();
2096 let mut new_ignore = None;
2097
2098 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2099 while let Some(child_abs_path) = child_paths.next().await {
2100 let child_abs_path = match child_abs_path {
2101 Ok(child_abs_path) => child_abs_path,
2102 Err(error) => {
2103 log::error!("error processing entry {:?}", error);
2104 continue;
2105 }
2106 };
2107 let child_name = child_abs_path.file_name().unwrap();
2108 let child_path: Arc<Path> = job.path.join(child_name).into();
2109 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2110 Some(metadata) => metadata,
2111 None => continue,
2112 };
2113
2114 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2115 if child_name == *GITIGNORE {
2116 let (ignore, err) = Gitignore::new(&child_abs_path);
2117 if let Some(err) = err {
2118 log::error!("error in ignore file {:?} - {:?}", child_name, err);
2119 }
2120 let ignore = Arc::new(ignore);
2121 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2122 new_ignore = Some(ignore);
2123
2124 // Update ignore status of any child entries we've already processed to reflect the
2125 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2126 // there should rarely be too numerous. Update the ignore stack associated with any
2127 // new jobs as well.
2128 let mut new_jobs = new_jobs.iter_mut();
2129 for entry in &mut new_entries {
2130 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2131 if entry.is_dir() {
2132 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2133 IgnoreStack::all()
2134 } else {
2135 ignore_stack.clone()
2136 };
2137 }
2138 }
2139 }
2140
2141 let mut child_entry = Entry::new(
2142 child_path.clone(),
2143 &child_metadata,
2144 &next_entry_id,
2145 root_char_bag,
2146 );
2147
2148 if child_metadata.is_dir {
2149 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2150 child_entry.is_ignored = is_ignored;
2151 new_entries.push(child_entry);
2152 new_jobs.push(ScanJob {
2153 abs_path: child_abs_path,
2154 path: child_path,
2155 ignore_stack: if is_ignored {
2156 IgnoreStack::all()
2157 } else {
2158 ignore_stack.clone()
2159 },
2160 scan_queue: job.scan_queue.clone(),
2161 });
2162 } else {
2163 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2164 new_entries.push(child_entry);
2165 };
2166 }
2167
2168 self.snapshot
2169 .lock()
2170 .populate_dir(job.path.clone(), new_entries, new_ignore);
2171 for new_job in new_jobs {
2172 job.scan_queue.send(new_job).await.unwrap();
2173 }
2174
2175 Ok(())
2176 }
2177
2178 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2179 let mut snapshot = self.snapshot();
2180 snapshot.scan_id += 1;
2181
2182 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2183 abs_path
2184 } else {
2185 return false;
2186 };
2187 let root_char_bag = snapshot.root_char_bag;
2188 let next_entry_id = snapshot.next_entry_id.clone();
2189
2190 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2191 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2192
2193 for event in &events {
2194 match event.path.strip_prefix(&root_abs_path) {
2195 Ok(path) => snapshot.remove_path(&path),
2196 Err(_) => {
2197 log::error!(
2198 "unexpected event {:?} for root path {:?}",
2199 event.path,
2200 root_abs_path
2201 );
2202 continue;
2203 }
2204 }
2205 }
2206
2207 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2208 for event in events {
2209 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2210 Ok(path) => Arc::from(path.to_path_buf()),
2211 Err(_) => {
2212 log::error!(
2213 "unexpected event {:?} for root path {:?}",
2214 event.path,
2215 root_abs_path
2216 );
2217 continue;
2218 }
2219 };
2220
2221 match self.fs.metadata(&event.path).await {
2222 Ok(Some(metadata)) => {
2223 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2224 let mut fs_entry = Entry::new(
2225 path.clone(),
2226 &metadata,
2227 snapshot.next_entry_id.as_ref(),
2228 snapshot.root_char_bag,
2229 );
2230 fs_entry.is_ignored = ignore_stack.is_all();
2231 snapshot.insert_entry(fs_entry);
2232 if metadata.is_dir {
2233 scan_queue_tx
2234 .send(ScanJob {
2235 abs_path: event.path,
2236 path,
2237 ignore_stack,
2238 scan_queue: scan_queue_tx.clone(),
2239 })
2240 .await
2241 .unwrap();
2242 }
2243 }
2244 Ok(None) => {}
2245 Err(err) => {
2246 // TODO - create a special 'error' entry in the entries tree to mark this
2247 log::error!("error reading file on event {:?}", err);
2248 }
2249 }
2250 }
2251
2252 *self.snapshot.lock() = snapshot;
2253
2254 // Scan any directories that were created as part of this event batch.
2255 drop(scan_queue_tx);
2256 self.executor
2257 .scoped(|scope| {
2258 for _ in 0..self.executor.num_cpus() {
2259 scope.spawn(async {
2260 while let Ok(job) = scan_queue_rx.recv().await {
2261 if let Err(err) = self
2262 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2263 .await
2264 {
2265 log::error!("error scanning {:?}: {}", job.abs_path, err);
2266 }
2267 }
2268 });
2269 }
2270 })
2271 .await;
2272
2273 // Attempt to detect renames only over a single batch of file-system events.
2274 self.snapshot.lock().removed_entry_ids.clear();
2275
2276 self.update_ignore_statuses().await;
2277 true
2278 }
2279
2280 async fn update_ignore_statuses(&self) {
2281 let mut snapshot = self.snapshot();
2282
2283 let mut ignores_to_update = Vec::new();
2284 let mut ignores_to_delete = Vec::new();
2285 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2286 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2287 ignores_to_update.push(parent_path.clone());
2288 }
2289
2290 let ignore_path = parent_path.join(&*GITIGNORE);
2291 if snapshot.entry_for_path(ignore_path).is_none() {
2292 ignores_to_delete.push(parent_path.clone());
2293 }
2294 }
2295
2296 for parent_path in ignores_to_delete {
2297 snapshot.ignores.remove(&parent_path);
2298 self.snapshot.lock().ignores.remove(&parent_path);
2299 }
2300
2301 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2302 ignores_to_update.sort_unstable();
2303 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2304 while let Some(parent_path) = ignores_to_update.next() {
2305 while ignores_to_update
2306 .peek()
2307 .map_or(false, |p| p.starts_with(&parent_path))
2308 {
2309 ignores_to_update.next().unwrap();
2310 }
2311
2312 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2313 ignore_queue_tx
2314 .send(UpdateIgnoreStatusJob {
2315 path: parent_path,
2316 ignore_stack,
2317 ignore_queue: ignore_queue_tx.clone(),
2318 })
2319 .await
2320 .unwrap();
2321 }
2322 drop(ignore_queue_tx);
2323
2324 self.executor
2325 .scoped(|scope| {
2326 for _ in 0..self.executor.num_cpus() {
2327 scope.spawn(async {
2328 while let Ok(job) = ignore_queue_rx.recv().await {
2329 self.update_ignore_status(job, &snapshot).await;
2330 }
2331 });
2332 }
2333 })
2334 .await;
2335 }
2336
2337 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2338 let mut ignore_stack = job.ignore_stack;
2339 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2340 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2341 }
2342
2343 let mut edits = Vec::new();
2344 for mut entry in snapshot.child_entries(&job.path).cloned() {
2345 let was_ignored = entry.is_ignored;
2346 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2347 if entry.is_dir() {
2348 let child_ignore_stack = if entry.is_ignored {
2349 IgnoreStack::all()
2350 } else {
2351 ignore_stack.clone()
2352 };
2353 job.ignore_queue
2354 .send(UpdateIgnoreStatusJob {
2355 path: entry.path.clone(),
2356 ignore_stack: child_ignore_stack,
2357 ignore_queue: job.ignore_queue.clone(),
2358 })
2359 .await
2360 .unwrap();
2361 }
2362
2363 if entry.is_ignored != was_ignored {
2364 edits.push(Edit::Insert(entry));
2365 }
2366 }
2367 self.snapshot.lock().entries_by_path.edit(edits, &());
2368 }
2369}
2370
2371async fn refresh_entry(
2372 fs: &dyn Fs,
2373 snapshot: &Mutex<Snapshot>,
2374 path: Arc<Path>,
2375 abs_path: &Path,
2376) -> Result<Entry> {
2377 let root_char_bag;
2378 let next_entry_id;
2379 {
2380 let snapshot = snapshot.lock();
2381 root_char_bag = snapshot.root_char_bag;
2382 next_entry_id = snapshot.next_entry_id.clone();
2383 }
2384 let entry = Entry::new(
2385 path,
2386 &fs.metadata(abs_path)
2387 .await?
2388 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2389 &next_entry_id,
2390 root_char_bag,
2391 );
2392 Ok(snapshot.lock().insert_entry(entry))
2393}
2394
2395fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2396 let mut result = root_char_bag;
2397 result.extend(
2398 path.to_string_lossy()
2399 .chars()
2400 .map(|c| c.to_ascii_lowercase()),
2401 );
2402 result
2403}
2404
2405struct ScanJob {
2406 abs_path: PathBuf,
2407 path: Arc<Path>,
2408 ignore_stack: Arc<IgnoreStack>,
2409 scan_queue: Sender<ScanJob>,
2410}
2411
2412struct UpdateIgnoreStatusJob {
2413 path: Arc<Path>,
2414 ignore_stack: Arc<IgnoreStack>,
2415 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2416}
2417
2418pub trait WorktreeHandle {
2419 #[cfg(test)]
2420 fn flush_fs_events<'a>(
2421 &self,
2422 cx: &'a gpui::TestAppContext,
2423 ) -> futures::future::LocalBoxFuture<'a, ()>;
2424}
2425
2426impl WorktreeHandle for ModelHandle<Worktree> {
2427 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2428 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2429 // extra directory scans, and emit extra scan-state notifications.
2430 //
2431 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2432 // to ensure that all redundant FS events have already been processed.
2433 #[cfg(test)]
2434 fn flush_fs_events<'a>(
2435 &self,
2436 cx: &'a gpui::TestAppContext,
2437 ) -> futures::future::LocalBoxFuture<'a, ()> {
2438 use smol::future::FutureExt;
2439
2440 let filename = "fs-event-sentinel";
2441 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2442 let tree = self.clone();
2443 async move {
2444 std::fs::write(root_path.join(filename), "").unwrap();
2445 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2446 .await;
2447
2448 std::fs::remove_file(root_path.join(filename)).unwrap();
2449 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2450 .await;
2451
2452 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2453 .await;
2454 }
2455 .boxed_local()
2456 }
2457}
2458
2459pub enum FileIter<'a> {
2460 All(Cursor<'a, Entry, FileCount, ()>),
2461 Visible(Cursor<'a, Entry, VisibleFileCount, ()>),
2462}
2463
2464impl<'a> FileIter<'a> {
2465 fn all(snapshot: &'a Snapshot, start: usize) -> Self {
2466 let mut cursor = snapshot.entries_by_path.cursor();
2467 cursor.seek(&FileCount(start), Bias::Right, &());
2468 Self::All(cursor)
2469 }
2470
2471 fn visible(snapshot: &'a Snapshot, start: usize) -> Self {
2472 let mut cursor = snapshot.entries_by_path.cursor();
2473 cursor.seek(&VisibleFileCount(start), Bias::Right, &());
2474 Self::Visible(cursor)
2475 }
2476
2477 fn next_internal(&mut self) {
2478 match self {
2479 Self::All(cursor) => {
2480 let ix = *cursor.seek_start();
2481 cursor.seek_forward(&FileCount(ix.0 + 1), Bias::Right, &());
2482 }
2483 Self::Visible(cursor) => {
2484 let ix = *cursor.seek_start();
2485 cursor.seek_forward(&VisibleFileCount(ix.0 + 1), Bias::Right, &());
2486 }
2487 }
2488 }
2489
2490 fn item(&self) -> Option<&'a Entry> {
2491 match self {
2492 Self::All(cursor) => cursor.item(),
2493 Self::Visible(cursor) => cursor.item(),
2494 }
2495 }
2496}
2497
2498impl<'a> Iterator for FileIter<'a> {
2499 type Item = &'a Entry;
2500
2501 fn next(&mut self) -> Option<Self::Item> {
2502 if let Some(entry) = self.item() {
2503 self.next_internal();
2504 Some(entry)
2505 } else {
2506 None
2507 }
2508 }
2509}
2510
2511struct ChildEntriesIter<'a> {
2512 parent_path: &'a Path,
2513 cursor: Cursor<'a, Entry, PathSearch<'a>, ()>,
2514}
2515
2516impl<'a> ChildEntriesIter<'a> {
2517 fn new(parent_path: &'a Path, snapshot: &'a Snapshot) -> Self {
2518 let mut cursor = snapshot.entries_by_path.cursor();
2519 cursor.seek(&PathSearch::Exact(parent_path), Bias::Right, &());
2520 Self {
2521 parent_path,
2522 cursor,
2523 }
2524 }
2525}
2526
2527impl<'a> Iterator for ChildEntriesIter<'a> {
2528 type Item = &'a Entry;
2529
2530 fn next(&mut self) -> Option<Self::Item> {
2531 if let Some(item) = self.cursor.item() {
2532 if item.path.starts_with(self.parent_path) {
2533 self.cursor
2534 .seek_forward(&PathSearch::Successor(&item.path), Bias::Left, &());
2535 Some(item)
2536 } else {
2537 None
2538 }
2539 } else {
2540 None
2541 }
2542 }
2543}
2544
2545impl<'a> From<&'a Entry> for proto::Entry {
2546 fn from(entry: &'a Entry) -> Self {
2547 Self {
2548 id: entry.id as u64,
2549 is_dir: entry.is_dir(),
2550 path: entry.path.to_string_lossy().to_string(),
2551 inode: entry.inode,
2552 mtime: Some(entry.mtime.into()),
2553 is_symlink: entry.is_symlink,
2554 is_ignored: entry.is_ignored,
2555 }
2556 }
2557}
2558
2559impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2560 type Error = anyhow::Error;
2561
2562 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2563 if let Some(mtime) = entry.mtime {
2564 let kind = if entry.is_dir {
2565 EntryKind::Dir
2566 } else {
2567 let mut char_bag = root_char_bag.clone();
2568 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2569 EntryKind::File(char_bag)
2570 };
2571 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2572 Ok(Entry {
2573 id: entry.id as usize,
2574 kind,
2575 path: path.clone(),
2576 inode: entry.inode,
2577 mtime: mtime.into(),
2578 is_symlink: entry.is_symlink,
2579 is_ignored: entry.is_ignored,
2580 })
2581 } else {
2582 Err(anyhow!(
2583 "missing mtime in remote worktree entry {:?}",
2584 entry.path
2585 ))
2586 }
2587 }
2588}
2589
2590#[cfg(test)]
2591mod tests {
2592 use super::*;
2593 use crate::fs::FakeFs;
2594 use crate::test::*;
2595 use anyhow::Result;
2596 use fs::RealFs;
2597 use rand::prelude::*;
2598 use serde_json::json;
2599 use std::time::UNIX_EPOCH;
2600 use std::{env, fmt::Write, os::unix, time::SystemTime};
2601
2602 #[gpui::test]
2603 async fn test_populate_and_search(cx: gpui::TestAppContext) {
2604 let dir = temp_tree(json!({
2605 "root": {
2606 "apple": "",
2607 "banana": {
2608 "carrot": {
2609 "date": "",
2610 "endive": "",
2611 }
2612 },
2613 "fennel": {
2614 "grape": "",
2615 }
2616 }
2617 }));
2618
2619 let root_link_path = dir.path().join("root_link");
2620 unix::fs::symlink(&dir.path().join("root"), &root_link_path).unwrap();
2621 unix::fs::symlink(
2622 &dir.path().join("root/fennel"),
2623 &dir.path().join("root/finnochio"),
2624 )
2625 .unwrap();
2626
2627 let tree = Worktree::open_local(
2628 rpc::Client::new(),
2629 root_link_path,
2630 Arc::new(RealFs),
2631 Default::default(),
2632 &mut cx.to_async(),
2633 )
2634 .await
2635 .unwrap();
2636
2637 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2638 .await;
2639 let snapshots = [cx.read(|cx| {
2640 let tree = tree.read(cx);
2641 assert_eq!(tree.file_count(), 5);
2642 assert_eq!(
2643 tree.inode_for_path("fennel/grape"),
2644 tree.inode_for_path("finnochio/grape")
2645 );
2646 tree.snapshot()
2647 })];
2648 let cancel_flag = Default::default();
2649 let results = cx
2650 .read(|cx| {
2651 match_paths(
2652 &snapshots,
2653 "bna",
2654 false,
2655 false,
2656 10,
2657 &cancel_flag,
2658 cx.background().clone(),
2659 )
2660 })
2661 .await;
2662 assert_eq!(
2663 results
2664 .into_iter()
2665 .map(|result| result.path)
2666 .collect::<Vec<Arc<Path>>>(),
2667 vec![
2668 PathBuf::from("banana/carrot/date").into(),
2669 PathBuf::from("banana/carrot/endive").into(),
2670 ]
2671 );
2672 }
2673
2674 #[gpui::test]
2675 async fn test_search_worktree_without_files(cx: gpui::TestAppContext) {
2676 let dir = temp_tree(json!({
2677 "root": {
2678 "dir1": {},
2679 "dir2": {
2680 "dir3": {}
2681 }
2682 }
2683 }));
2684 let tree = Worktree::open_local(
2685 rpc::Client::new(),
2686 dir.path(),
2687 Arc::new(RealFs),
2688 Default::default(),
2689 &mut cx.to_async(),
2690 )
2691 .await
2692 .unwrap();
2693
2694 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2695 .await;
2696 let snapshots = [cx.read(|cx| {
2697 let tree = tree.read(cx);
2698 assert_eq!(tree.file_count(), 0);
2699 tree.snapshot()
2700 })];
2701 let cancel_flag = Default::default();
2702 let results = cx
2703 .read(|cx| {
2704 match_paths(
2705 &snapshots,
2706 "dir",
2707 false,
2708 false,
2709 10,
2710 &cancel_flag,
2711 cx.background().clone(),
2712 )
2713 })
2714 .await;
2715 assert_eq!(
2716 results
2717 .into_iter()
2718 .map(|result| result.path)
2719 .collect::<Vec<Arc<Path>>>(),
2720 vec![]
2721 );
2722 }
2723
2724 #[gpui::test]
2725 async fn test_save_file(mut cx: gpui::TestAppContext) {
2726 let dir = temp_tree(json!({
2727 "file1": "the old contents",
2728 }));
2729 let tree = Worktree::open_local(
2730 rpc::Client::new(),
2731 dir.path(),
2732 Arc::new(RealFs),
2733 Default::default(),
2734 &mut cx.to_async(),
2735 )
2736 .await
2737 .unwrap();
2738 let buffer = tree
2739 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
2740 .await
2741 .unwrap();
2742 let save = buffer.update(&mut cx, |buffer, cx| {
2743 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2744 buffer.save(cx).unwrap()
2745 });
2746 save.await.unwrap();
2747
2748 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
2749 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2750 }
2751
2752 #[gpui::test]
2753 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
2754 let dir = temp_tree(json!({
2755 "file1": "the old contents",
2756 }));
2757 let file_path = dir.path().join("file1");
2758
2759 let tree = Worktree::open_local(
2760 rpc::Client::new(),
2761 file_path.clone(),
2762 Arc::new(RealFs),
2763 Default::default(),
2764 &mut cx.to_async(),
2765 )
2766 .await
2767 .unwrap();
2768 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2769 .await;
2770 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
2771
2772 let buffer = tree
2773 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
2774 .await
2775 .unwrap();
2776 let save = buffer.update(&mut cx, |buffer, cx| {
2777 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
2778 buffer.save(cx).unwrap()
2779 });
2780 save.await.unwrap();
2781
2782 let new_text = std::fs::read_to_string(file_path).unwrap();
2783 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
2784 }
2785
2786 #[gpui::test]
2787 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
2788 let dir = temp_tree(json!({
2789 "a": {
2790 "file1": "",
2791 "file2": "",
2792 "file3": "",
2793 },
2794 "b": {
2795 "c": {
2796 "file4": "",
2797 "file5": "",
2798 }
2799 }
2800 }));
2801
2802 let tree = Worktree::open_local(
2803 rpc::Client::new(),
2804 dir.path(),
2805 Arc::new(RealFs),
2806 Default::default(),
2807 &mut cx.to_async(),
2808 )
2809 .await
2810 .unwrap();
2811
2812 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
2813 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
2814 async move { buffer.await.unwrap() }
2815 };
2816 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
2817 tree.read_with(cx, |tree, _| {
2818 tree.entry_for_path(path)
2819 .expect(&format!("no entry for path {}", path))
2820 .id
2821 })
2822 };
2823
2824 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
2825 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
2826 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
2827 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
2828
2829 let file2_id = id_for_path("a/file2", &cx);
2830 let file3_id = id_for_path("a/file3", &cx);
2831 let file4_id = id_for_path("b/c/file4", &cx);
2832
2833 // Wait for the initial scan.
2834 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2835 .await;
2836
2837 // Create a remote copy of this worktree.
2838 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
2839 let worktree_id = 1;
2840 let share_request = tree
2841 .update(&mut cx, |tree, cx| {
2842 tree.as_local().unwrap().share_request(cx)
2843 })
2844 .await
2845 .unwrap();
2846 let remote = Worktree::remote(
2847 proto::JoinWorktreeResponse {
2848 worktree: share_request.worktree,
2849 replica_id: 1,
2850 peers: Vec::new(),
2851 },
2852 rpc::Client::new(),
2853 Default::default(),
2854 &mut cx.to_async(),
2855 )
2856 .await
2857 .unwrap();
2858
2859 cx.read(|cx| {
2860 assert!(!buffer2.read(cx).is_dirty());
2861 assert!(!buffer3.read(cx).is_dirty());
2862 assert!(!buffer4.read(cx).is_dirty());
2863 assert!(!buffer5.read(cx).is_dirty());
2864 });
2865
2866 // Rename and delete files and directories.
2867 tree.flush_fs_events(&cx).await;
2868 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
2869 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
2870 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
2871 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
2872 tree.flush_fs_events(&cx).await;
2873
2874 let expected_paths = vec![
2875 "a",
2876 "a/file1",
2877 "a/file2.new",
2878 "b",
2879 "d",
2880 "d/file3",
2881 "d/file4",
2882 ];
2883
2884 cx.read(|app| {
2885 assert_eq!(
2886 tree.read(app)
2887 .paths()
2888 .map(|p| p.to_str().unwrap())
2889 .collect::<Vec<_>>(),
2890 expected_paths
2891 );
2892
2893 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
2894 assert_eq!(id_for_path("d/file3", &cx), file3_id);
2895 assert_eq!(id_for_path("d/file4", &cx), file4_id);
2896
2897 assert_eq!(
2898 buffer2.read(app).file().unwrap().path().as_ref(),
2899 Path::new("a/file2.new")
2900 );
2901 assert_eq!(
2902 buffer3.read(app).file().unwrap().path().as_ref(),
2903 Path::new("d/file3")
2904 );
2905 assert_eq!(
2906 buffer4.read(app).file().unwrap().path().as_ref(),
2907 Path::new("d/file4")
2908 );
2909 assert_eq!(
2910 buffer5.read(app).file().unwrap().path().as_ref(),
2911 Path::new("b/c/file5")
2912 );
2913
2914 assert!(!buffer2.read(app).file().unwrap().is_deleted());
2915 assert!(!buffer3.read(app).file().unwrap().is_deleted());
2916 assert!(!buffer4.read(app).file().unwrap().is_deleted());
2917 assert!(buffer5.read(app).file().unwrap().is_deleted());
2918 });
2919
2920 // Update the remote worktree. Check that it becomes consistent with the
2921 // local worktree.
2922 remote.update(&mut cx, |remote, cx| {
2923 let update_message = tree
2924 .read(cx)
2925 .snapshot()
2926 .build_update(&initial_snapshot, worktree_id);
2927 remote
2928 .as_remote_mut()
2929 .unwrap()
2930 .snapshot
2931 .apply_update(update_message)
2932 .unwrap();
2933
2934 assert_eq!(
2935 remote
2936 .paths()
2937 .map(|p| p.to_str().unwrap())
2938 .collect::<Vec<_>>(),
2939 expected_paths
2940 );
2941 });
2942 }
2943
2944 #[gpui::test]
2945 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
2946 let dir = temp_tree(json!({
2947 ".git": {},
2948 ".gitignore": "ignored-dir\n",
2949 "tracked-dir": {
2950 "tracked-file1": "tracked contents",
2951 },
2952 "ignored-dir": {
2953 "ignored-file1": "ignored contents",
2954 }
2955 }));
2956
2957 let tree = Worktree::open_local(
2958 rpc::Client::new(),
2959 dir.path(),
2960 Arc::new(RealFs),
2961 Default::default(),
2962 &mut cx.to_async(),
2963 )
2964 .await
2965 .unwrap();
2966 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2967 .await;
2968 tree.flush_fs_events(&cx).await;
2969 cx.read(|cx| {
2970 let tree = tree.read(cx);
2971 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
2972 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
2973 assert_eq!(tracked.is_ignored, false);
2974 assert_eq!(ignored.is_ignored, true);
2975 });
2976
2977 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
2978 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
2979 tree.flush_fs_events(&cx).await;
2980 cx.read(|cx| {
2981 let tree = tree.read(cx);
2982 let dot_git = tree.entry_for_path(".git").unwrap();
2983 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
2984 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
2985 assert_eq!(tracked.is_ignored, false);
2986 assert_eq!(ignored.is_ignored, true);
2987 assert_eq!(dot_git.is_ignored, true);
2988 });
2989 }
2990
2991 #[gpui::test]
2992 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
2993 let user_id = 100;
2994 let mut client = rpc::Client::new();
2995 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
2996
2997 let fs = Arc::new(FakeFs::new());
2998 fs.insert_tree(
2999 "/path",
3000 json!({
3001 "to": {
3002 "the-dir": {
3003 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3004 "a.txt": "a-contents",
3005 },
3006 },
3007 }),
3008 )
3009 .await;
3010
3011 let worktree = Worktree::open_local(
3012 client.clone(),
3013 "/path/to/the-dir".as_ref(),
3014 fs,
3015 Default::default(),
3016 &mut cx.to_async(),
3017 )
3018 .await
3019 .unwrap();
3020
3021 {
3022 let cx = cx.to_async();
3023 client.authenticate_and_connect(&cx).await.unwrap();
3024 }
3025
3026 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3027 assert_eq!(
3028 open_worktree.payload,
3029 proto::OpenWorktree {
3030 root_name: "the-dir".to_string(),
3031 collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3032 }
3033 );
3034
3035 server
3036 .respond(
3037 open_worktree.receipt(),
3038 proto::OpenWorktreeResponse { worktree_id: 5 },
3039 )
3040 .await;
3041 let remote_id = worktree
3042 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3043 .await;
3044 assert_eq!(remote_id, Some(5));
3045
3046 cx.update(move |_| drop(worktree));
3047 server.receive::<proto::CloseWorktree>().await.unwrap();
3048 }
3049
3050 #[gpui::test(iterations = 100)]
3051 fn test_random(mut rng: StdRng) {
3052 let operations = env::var("OPERATIONS")
3053 .map(|o| o.parse().unwrap())
3054 .unwrap_or(40);
3055 let initial_entries = env::var("INITIAL_ENTRIES")
3056 .map(|o| o.parse().unwrap())
3057 .unwrap_or(20);
3058
3059 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3060 for _ in 0..initial_entries {
3061 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3062 }
3063 log::info!("Generated initial tree");
3064
3065 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3066 let fs = Arc::new(RealFs);
3067 let next_entry_id = Arc::new(AtomicUsize::new(0));
3068 let mut initial_snapshot = Snapshot {
3069 id: 0,
3070 scan_id: 0,
3071 abs_path: root_dir.path().into(),
3072 entries_by_path: Default::default(),
3073 entries_by_id: Default::default(),
3074 removed_entry_ids: Default::default(),
3075 ignores: Default::default(),
3076 root_name: Default::default(),
3077 root_char_bag: Default::default(),
3078 next_entry_id: next_entry_id.clone(),
3079 };
3080 initial_snapshot.insert_entry(Entry::new(
3081 Path::new("").into(),
3082 &smol::block_on(fs.metadata(root_dir.path()))
3083 .unwrap()
3084 .unwrap(),
3085 &next_entry_id,
3086 Default::default(),
3087 ));
3088 let mut scanner = BackgroundScanner::new(
3089 Arc::new(Mutex::new(initial_snapshot.clone())),
3090 notify_tx,
3091 fs.clone(),
3092 Arc::new(gpui::executor::Background::new()),
3093 );
3094 smol::block_on(scanner.scan_dirs()).unwrap();
3095 scanner.snapshot().check_invariants();
3096
3097 let mut events = Vec::new();
3098 let mut mutations_len = operations;
3099 while mutations_len > 1 {
3100 if !events.is_empty() && rng.gen_bool(0.4) {
3101 let len = rng.gen_range(0..=events.len());
3102 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3103 log::info!("Delivering events: {:#?}", to_deliver);
3104 smol::block_on(scanner.process_events(to_deliver));
3105 scanner.snapshot().check_invariants();
3106 } else {
3107 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3108 mutations_len -= 1;
3109 }
3110 }
3111 log::info!("Quiescing: {:#?}", events);
3112 smol::block_on(scanner.process_events(events));
3113 scanner.snapshot().check_invariants();
3114
3115 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3116 let mut new_scanner = BackgroundScanner::new(
3117 Arc::new(Mutex::new(initial_snapshot)),
3118 notify_tx,
3119 scanner.fs.clone(),
3120 scanner.executor.clone(),
3121 );
3122 smol::block_on(new_scanner.scan_dirs()).unwrap();
3123 assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());
3124 }
3125
3126 fn randomly_mutate_tree(
3127 root_path: &Path,
3128 insertion_probability: f64,
3129 rng: &mut impl Rng,
3130 ) -> Result<Vec<fsevent::Event>> {
3131 let root_path = root_path.canonicalize().unwrap();
3132 let (dirs, files) = read_dir_recursive(root_path.clone());
3133
3134 let mut events = Vec::new();
3135 let mut record_event = |path: PathBuf| {
3136 events.push(fsevent::Event {
3137 event_id: SystemTime::now()
3138 .duration_since(UNIX_EPOCH)
3139 .unwrap()
3140 .as_secs(),
3141 flags: fsevent::StreamFlags::empty(),
3142 path,
3143 });
3144 };
3145
3146 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3147 let path = dirs.choose(rng).unwrap();
3148 let new_path = path.join(gen_name(rng));
3149
3150 if rng.gen() {
3151 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3152 std::fs::create_dir(&new_path)?;
3153 } else {
3154 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3155 std::fs::write(&new_path, "")?;
3156 }
3157 record_event(new_path);
3158 } else if rng.gen_bool(0.05) {
3159 let ignore_dir_path = dirs.choose(rng).unwrap();
3160 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3161
3162 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3163 let files_to_ignore = {
3164 let len = rng.gen_range(0..=subfiles.len());
3165 subfiles.choose_multiple(rng, len)
3166 };
3167 let dirs_to_ignore = {
3168 let len = rng.gen_range(0..subdirs.len());
3169 subdirs.choose_multiple(rng, len)
3170 };
3171
3172 let mut ignore_contents = String::new();
3173 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3174 write!(
3175 ignore_contents,
3176 "{}\n",
3177 path_to_ignore
3178 .strip_prefix(&ignore_dir_path)?
3179 .to_str()
3180 .unwrap()
3181 )
3182 .unwrap();
3183 }
3184 log::info!(
3185 "Creating {:?} with contents:\n{}",
3186 ignore_path.strip_prefix(&root_path)?,
3187 ignore_contents
3188 );
3189 std::fs::write(&ignore_path, ignore_contents).unwrap();
3190 record_event(ignore_path);
3191 } else {
3192 let old_path = {
3193 let file_path = files.choose(rng);
3194 let dir_path = dirs[1..].choose(rng);
3195 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3196 };
3197
3198 let is_rename = rng.gen();
3199 if is_rename {
3200 let new_path_parent = dirs
3201 .iter()
3202 .filter(|d| !d.starts_with(old_path))
3203 .choose(rng)
3204 .unwrap();
3205
3206 let overwrite_existing_dir =
3207 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3208 let new_path = if overwrite_existing_dir {
3209 std::fs::remove_dir_all(&new_path_parent).ok();
3210 new_path_parent.to_path_buf()
3211 } else {
3212 new_path_parent.join(gen_name(rng))
3213 };
3214
3215 log::info!(
3216 "Renaming {:?} to {}{:?}",
3217 old_path.strip_prefix(&root_path)?,
3218 if overwrite_existing_dir {
3219 "overwrite "
3220 } else {
3221 ""
3222 },
3223 new_path.strip_prefix(&root_path)?
3224 );
3225 std::fs::rename(&old_path, &new_path)?;
3226 record_event(old_path.clone());
3227 record_event(new_path);
3228 } else if old_path.is_dir() {
3229 let (dirs, files) = read_dir_recursive(old_path.clone());
3230
3231 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3232 std::fs::remove_dir_all(&old_path).unwrap();
3233 for file in files {
3234 record_event(file);
3235 }
3236 for dir in dirs {
3237 record_event(dir);
3238 }
3239 } else {
3240 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3241 std::fs::remove_file(old_path).unwrap();
3242 record_event(old_path.clone());
3243 }
3244 }
3245
3246 Ok(events)
3247 }
3248
3249 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3250 let child_entries = std::fs::read_dir(&path).unwrap();
3251 let mut dirs = vec![path];
3252 let mut files = Vec::new();
3253 for child_entry in child_entries {
3254 let child_path = child_entry.unwrap().path();
3255 if child_path.is_dir() {
3256 let (child_dirs, child_files) = read_dir_recursive(child_path);
3257 dirs.extend(child_dirs);
3258 files.extend(child_files);
3259 } else {
3260 files.push(child_path);
3261 }
3262 }
3263 (dirs, files)
3264 }
3265
3266 fn gen_name(rng: &mut impl Rng) -> String {
3267 (0..6)
3268 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3269 .map(char::from)
3270 .collect()
3271 }
3272
3273 impl Snapshot {
3274 fn check_invariants(&self) {
3275 let mut files = self.files(0);
3276 let mut visible_files = self.visible_files(0);
3277 for entry in self.entries_by_path.cursor::<(), ()>() {
3278 if entry.is_file() {
3279 assert_eq!(files.next().unwrap().inode, entry.inode);
3280 if !entry.is_ignored {
3281 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3282 }
3283 }
3284 }
3285 assert!(files.next().is_none());
3286 assert!(visible_files.next().is_none());
3287
3288 let mut bfs_paths = Vec::new();
3289 let mut stack = vec![Path::new("")];
3290 while let Some(path) = stack.pop() {
3291 bfs_paths.push(path);
3292 let ix = stack.len();
3293 for child_entry in self.child_entries(path) {
3294 stack.insert(ix, &child_entry.path);
3295 }
3296 }
3297
3298 let dfs_paths = self
3299 .entries_by_path
3300 .cursor::<(), ()>()
3301 .map(|e| e.path.as_ref())
3302 .collect::<Vec<_>>();
3303 assert_eq!(bfs_paths, dfs_paths);
3304
3305 for (ignore_parent_path, _) in &self.ignores {
3306 assert!(self.entry_for_path(ignore_parent_path).is_some());
3307 assert!(self
3308 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3309 .is_some());
3310 }
3311 }
3312
3313 fn to_vec(&self) -> Vec<(&Path, u64, bool)> {
3314 let mut paths = Vec::new();
3315 for entry in self.entries_by_path.cursor::<(), ()>() {
3316 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3317 }
3318 paths.sort_by(|a, b| a.0.cmp(&b.0));
3319 paths
3320 }
3321 }
3322}