1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4};
5use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
6use anyhow::{anyhow, Context, Result};
7use client::{proto, Client, PeerId, TypedEnvelope};
8use clock::ReplicaId;
9use futures::{Stream, StreamExt};
10use fuzzy::CharBag;
11use gpui::{
12 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
13 Task, UpgradeModelHandle, WeakModelHandle,
14};
15use language::{Buffer, LanguageRegistry, Operation, Rope};
16use lazy_static::lazy_static;
17use lsp::LanguageServer;
18use parking_lot::Mutex;
19use postage::{
20 prelude::{Sink as _, Stream as _},
21 watch,
22};
23
24use serde::Deserialize;
25use smol::channel::{self, Sender};
26use std::{
27 any::Any,
28 cmp::{self, Ordering},
29 collections::HashMap,
30 convert::{TryFrom, TryInto},
31 ffi::{OsStr, OsString},
32 fmt,
33 future::Future,
34 ops::Deref,
35 path::{Path, PathBuf},
36 sync::{
37 atomic::{AtomicUsize, Ordering::SeqCst},
38 Arc,
39 },
40 time::{Duration, SystemTime},
41};
42use sum_tree::Bias;
43use sum_tree::{Edit, SeekTarget, SumTree};
44use util::{ResultExt, TryFutureExt};
45
46lazy_static! {
47 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
48}
49
50#[derive(Clone, Debug)]
51enum ScanState {
52 Idle,
53 Scanning,
54 Err(Arc<anyhow::Error>),
55}
56
57pub enum Worktree {
58 Local(LocalWorktree),
59 Remote(RemoteWorktree),
60}
61
62pub enum Event {
63 Closed,
64}
65
66impl Entity for Worktree {
67 type Event = Event;
68
69 fn release(&mut self, cx: &mut MutableAppContext) {
70 match self {
71 Self::Local(tree) => {
72 if let Some(worktree_id) = *tree.remote_id.borrow() {
73 let rpc = tree.rpc.clone();
74 cx.spawn(|_| async move {
75 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
76 log::error!("error closing worktree: {}", err);
77 }
78 })
79 .detach();
80 }
81 }
82 Self::Remote(tree) => {
83 let rpc = tree.client.clone();
84 let worktree_id = tree.remote_id;
85 cx.spawn(|_| async move {
86 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
87 log::error!("error closing worktree: {}", err);
88 }
89 })
90 .detach();
91 }
92 }
93 }
94
95 fn app_will_quit(
96 &mut self,
97 _: &mut MutableAppContext,
98 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
99 use futures::FutureExt;
100
101 if let Some(server) = self.language_server() {
102 if let Some(shutdown) = server.shutdown() {
103 return Some(
104 async move {
105 shutdown.await.log_err();
106 }
107 .boxed(),
108 );
109 }
110 }
111 None
112 }
113}
114
115impl Worktree {
116 pub async fn open_local(
117 rpc: Arc<Client>,
118 path: impl Into<Arc<Path>>,
119 fs: Arc<dyn Fs>,
120 languages: Arc<LanguageRegistry>,
121 language_server: Option<Arc<LanguageServer>>,
122 cx: &mut AsyncAppContext,
123 ) -> Result<ModelHandle<Self>> {
124 let (tree, scan_states_tx) =
125 LocalWorktree::new(rpc, path, fs.clone(), languages, language_server, cx).await?;
126 tree.update(cx, |tree, cx| {
127 let tree = tree.as_local_mut().unwrap();
128 let abs_path = tree.snapshot.abs_path.clone();
129 let background_snapshot = tree.background_snapshot.clone();
130 let background = cx.background().clone();
131 tree._background_scanner_task = Some(cx.background().spawn(async move {
132 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
133 let scanner =
134 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
135 scanner.run(events).await;
136 }));
137 });
138 Ok(tree)
139 }
140
141 pub async fn open_remote(
142 rpc: Arc<Client>,
143 id: u64,
144 languages: Arc<LanguageRegistry>,
145 cx: &mut AsyncAppContext,
146 ) -> Result<ModelHandle<Self>> {
147 let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
148 Worktree::remote(response, rpc, languages, cx).await
149 }
150
151 async fn remote(
152 join_response: proto::JoinWorktreeResponse,
153 rpc: Arc<Client>,
154 languages: Arc<LanguageRegistry>,
155 cx: &mut AsyncAppContext,
156 ) -> Result<ModelHandle<Self>> {
157 let worktree = join_response
158 .worktree
159 .ok_or_else(|| anyhow!("empty worktree"))?;
160
161 let remote_id = worktree.id;
162 let replica_id = join_response.replica_id as ReplicaId;
163 let peers = join_response.peers;
164 let root_char_bag: CharBag = worktree
165 .root_name
166 .chars()
167 .map(|c| c.to_ascii_lowercase())
168 .collect();
169 let root_name = worktree.root_name.clone();
170 let (entries_by_path, entries_by_id) = cx
171 .background()
172 .spawn(async move {
173 let mut entries_by_path_edits = Vec::new();
174 let mut entries_by_id_edits = Vec::new();
175 for entry in worktree.entries {
176 match Entry::try_from((&root_char_bag, entry)) {
177 Ok(entry) => {
178 entries_by_id_edits.push(Edit::Insert(PathEntry {
179 id: entry.id,
180 path: entry.path.clone(),
181 is_ignored: entry.is_ignored,
182 scan_id: 0,
183 }));
184 entries_by_path_edits.push(Edit::Insert(entry));
185 }
186 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
187 }
188 }
189
190 let mut entries_by_path = SumTree::new();
191 let mut entries_by_id = SumTree::new();
192 entries_by_path.edit(entries_by_path_edits, &());
193 entries_by_id.edit(entries_by_id_edits, &());
194 (entries_by_path, entries_by_id)
195 })
196 .await;
197
198 let worktree = cx.update(|cx| {
199 cx.add_model(|cx: &mut ModelContext<Worktree>| {
200 let snapshot = Snapshot {
201 id: cx.model_id(),
202 scan_id: 0,
203 abs_path: Path::new("").into(),
204 root_name,
205 root_char_bag,
206 ignores: Default::default(),
207 entries_by_path,
208 entries_by_id,
209 removed_entry_ids: Default::default(),
210 next_entry_id: Default::default(),
211 };
212
213 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
214 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
215
216 cx.background()
217 .spawn(async move {
218 while let Some(update) = updates_rx.recv().await {
219 let mut snapshot = snapshot_tx.borrow().clone();
220 if let Err(error) = snapshot.apply_update(update) {
221 log::error!("error applying worktree update: {}", error);
222 }
223 *snapshot_tx.borrow_mut() = snapshot;
224 }
225 })
226 .detach();
227
228 {
229 let mut snapshot_rx = snapshot_rx.clone();
230 cx.spawn_weak(|this, mut cx| async move {
231 while let Some(_) = snapshot_rx.recv().await {
232 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
233 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
234 } else {
235 break;
236 }
237 }
238 })
239 .detach();
240 }
241
242 let _subscriptions = vec![
243 rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
244 rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
245 rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
246 rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
247 rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
248 rpc.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
249 ];
250
251 Worktree::Remote(RemoteWorktree {
252 remote_id,
253 replica_id,
254 snapshot,
255 snapshot_rx,
256 updates_tx,
257 client: rpc.clone(),
258 open_buffers: Default::default(),
259 peers: peers
260 .into_iter()
261 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
262 .collect(),
263 queued_operations: Default::default(),
264 languages,
265 _subscriptions,
266 })
267 })
268 });
269
270 Ok(worktree)
271 }
272
273 pub fn as_local(&self) -> Option<&LocalWorktree> {
274 if let Worktree::Local(worktree) = self {
275 Some(worktree)
276 } else {
277 None
278 }
279 }
280
281 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
282 if let Worktree::Local(worktree) = self {
283 Some(worktree)
284 } else {
285 None
286 }
287 }
288
289 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
290 if let Worktree::Remote(worktree) = self {
291 Some(worktree)
292 } else {
293 None
294 }
295 }
296
297 pub fn snapshot(&self) -> Snapshot {
298 match self {
299 Worktree::Local(worktree) => worktree.snapshot(),
300 Worktree::Remote(worktree) => worktree.snapshot(),
301 }
302 }
303
304 pub fn replica_id(&self) -> ReplicaId {
305 match self {
306 Worktree::Local(_) => 0,
307 Worktree::Remote(worktree) => worktree.replica_id,
308 }
309 }
310
311 pub fn languages(&self) -> &Arc<LanguageRegistry> {
312 match self {
313 Worktree::Local(worktree) => &worktree.languages,
314 Worktree::Remote(worktree) => &worktree.languages,
315 }
316 }
317
318 pub fn language_server(&self) -> Option<&Arc<LanguageServer>> {
319 match self {
320 Worktree::Local(worktree) => worktree.language_server.as_ref(),
321 Worktree::Remote(_) => None,
322 }
323 }
324
325 pub fn handle_add_peer(
326 &mut self,
327 envelope: TypedEnvelope<proto::AddPeer>,
328 _: Arc<Client>,
329 cx: &mut ModelContext<Self>,
330 ) -> Result<()> {
331 match self {
332 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
333 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
334 }
335 }
336
337 pub fn handle_remove_peer(
338 &mut self,
339 envelope: TypedEnvelope<proto::RemovePeer>,
340 _: Arc<Client>,
341 cx: &mut ModelContext<Self>,
342 ) -> Result<()> {
343 match self {
344 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
345 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
346 }
347 }
348
349 pub fn handle_update(
350 &mut self,
351 envelope: TypedEnvelope<proto::UpdateWorktree>,
352 _: Arc<Client>,
353 cx: &mut ModelContext<Self>,
354 ) -> anyhow::Result<()> {
355 self.as_remote_mut()
356 .unwrap()
357 .update_from_remote(envelope, cx)
358 }
359
360 pub fn handle_open_buffer(
361 &mut self,
362 envelope: TypedEnvelope<proto::OpenBuffer>,
363 rpc: Arc<Client>,
364 cx: &mut ModelContext<Self>,
365 ) -> anyhow::Result<()> {
366 let receipt = envelope.receipt();
367
368 let response = self
369 .as_local_mut()
370 .unwrap()
371 .open_remote_buffer(envelope, cx);
372
373 cx.background()
374 .spawn(
375 async move {
376 rpc.respond(receipt, response.await?).await?;
377 Ok(())
378 }
379 .log_err(),
380 )
381 .detach();
382
383 Ok(())
384 }
385
386 pub fn handle_close_buffer(
387 &mut self,
388 envelope: TypedEnvelope<proto::CloseBuffer>,
389 _: Arc<Client>,
390 cx: &mut ModelContext<Self>,
391 ) -> anyhow::Result<()> {
392 self.as_local_mut()
393 .unwrap()
394 .close_remote_buffer(envelope, cx)
395 }
396
397 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
398 match self {
399 Worktree::Local(worktree) => &worktree.peers,
400 Worktree::Remote(worktree) => &worktree.peers,
401 }
402 }
403
404 pub fn open_buffer(
405 &mut self,
406 path: impl AsRef<Path>,
407 cx: &mut ModelContext<Self>,
408 ) -> Task<Result<ModelHandle<Buffer>>> {
409 match self {
410 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
411 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
412 }
413 }
414
415 #[cfg(feature = "test-support")]
416 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
417 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
418 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
419 Worktree::Remote(worktree) => {
420 Box::new(worktree.open_buffers.values().filter_map(|buf| {
421 if let RemoteBuffer::Loaded(buf) = buf {
422 Some(buf)
423 } else {
424 None
425 }
426 }))
427 }
428 };
429
430 let path = path.as_ref();
431 open_buffers
432 .find(|buffer| {
433 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
434 file.path().as_ref() == path
435 } else {
436 false
437 }
438 })
439 .is_some()
440 }
441
442 pub fn handle_update_buffer(
443 &mut self,
444 envelope: TypedEnvelope<proto::UpdateBuffer>,
445 _: Arc<Client>,
446 cx: &mut ModelContext<Self>,
447 ) -> Result<()> {
448 let payload = envelope.payload.clone();
449 let buffer_id = payload.buffer_id as usize;
450 let ops = payload
451 .operations
452 .into_iter()
453 .map(|op| language::proto::deserialize_operation(op))
454 .collect::<Result<Vec<_>, _>>()?;
455
456 match self {
457 Worktree::Local(worktree) => {
458 let buffer = worktree
459 .open_buffers
460 .get(&buffer_id)
461 .and_then(|buf| buf.upgrade(cx))
462 .ok_or_else(|| {
463 anyhow!("invalid buffer {} in update buffer message", buffer_id)
464 })?;
465 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
466 }
467 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
468 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
469 Some(RemoteBuffer::Loaded(buffer)) => {
470 if let Some(buffer) = buffer.upgrade(cx) {
471 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
472 } else {
473 worktree
474 .open_buffers
475 .insert(buffer_id, RemoteBuffer::Operations(ops));
476 }
477 }
478 None => {
479 worktree
480 .open_buffers
481 .insert(buffer_id, RemoteBuffer::Operations(ops));
482 }
483 },
484 }
485
486 Ok(())
487 }
488
489 pub fn handle_save_buffer(
490 &mut self,
491 envelope: TypedEnvelope<proto::SaveBuffer>,
492 rpc: Arc<Client>,
493 cx: &mut ModelContext<Self>,
494 ) -> Result<()> {
495 let sender_id = envelope.original_sender_id()?;
496 let buffer = self
497 .as_local()
498 .unwrap()
499 .shared_buffers
500 .get(&sender_id)
501 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
502 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
503
504 let receipt = envelope.receipt();
505 let worktree_id = envelope.payload.worktree_id;
506 let buffer_id = envelope.payload.buffer_id;
507 let save = cx.spawn(|_, mut cx| async move {
508 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
509 });
510
511 cx.background()
512 .spawn(
513 async move {
514 let (version, mtime) = save.await?;
515
516 rpc.respond(
517 receipt,
518 proto::BufferSaved {
519 worktree_id,
520 buffer_id,
521 version: (&version).into(),
522 mtime: Some(mtime.into()),
523 },
524 )
525 .await?;
526
527 Ok(())
528 }
529 .log_err(),
530 )
531 .detach();
532
533 Ok(())
534 }
535
536 pub fn handle_buffer_saved(
537 &mut self,
538 envelope: TypedEnvelope<proto::BufferSaved>,
539 _: Arc<Client>,
540 cx: &mut ModelContext<Self>,
541 ) -> Result<()> {
542 let payload = envelope.payload.clone();
543 let worktree = self.as_remote_mut().unwrap();
544 if let Some(buffer) = worktree
545 .open_buffers
546 .get(&(payload.buffer_id as usize))
547 .and_then(|buf| buf.upgrade(cx))
548 {
549 buffer.update(cx, |buffer, cx| {
550 let version = payload.version.try_into()?;
551 let mtime = payload
552 .mtime
553 .ok_or_else(|| anyhow!("missing mtime"))?
554 .into();
555 buffer.did_save(version, mtime, None, cx);
556 Result::<_, anyhow::Error>::Ok(())
557 })?;
558 }
559 Ok(())
560 }
561
562 pub fn handle_unshare(
563 &mut self,
564 _: TypedEnvelope<proto::UnshareWorktree>,
565 _: Arc<Client>,
566 cx: &mut ModelContext<Self>,
567 ) -> Result<()> {
568 cx.emit(Event::Closed);
569 Ok(())
570 }
571
572 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
573 match self {
574 Self::Local(worktree) => {
575 let is_fake_fs = worktree.fs.is_fake();
576 worktree.snapshot = worktree.background_snapshot.lock().clone();
577 if worktree.is_scanning() {
578 if worktree.poll_task.is_none() {
579 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
580 if is_fake_fs {
581 smol::future::yield_now().await;
582 } else {
583 smol::Timer::after(Duration::from_millis(100)).await;
584 }
585 this.update(&mut cx, |this, cx| {
586 this.as_local_mut().unwrap().poll_task = None;
587 this.poll_snapshot(cx);
588 })
589 }));
590 }
591 } else {
592 worktree.poll_task.take();
593 self.update_open_buffers(cx);
594 }
595 }
596 Self::Remote(worktree) => {
597 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
598 self.update_open_buffers(cx);
599 }
600 };
601
602 cx.notify();
603 }
604
605 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
606 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
607 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
608 Self::Remote(worktree) => {
609 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
610 if let RemoteBuffer::Loaded(buf) = buf {
611 Some((id, buf))
612 } else {
613 None
614 }
615 }))
616 }
617 };
618
619 let local = self.as_local().is_some();
620 let worktree_path = self.abs_path.clone();
621 let worktree_handle = cx.handle();
622 let mut buffers_to_delete = Vec::new();
623 for (buffer_id, buffer) in open_buffers {
624 if let Some(buffer) = buffer.upgrade(cx) {
625 buffer.update(cx, |buffer, cx| {
626 if let Some(old_file) = buffer.file() {
627 let new_file = if let Some(entry) = old_file
628 .entry_id()
629 .and_then(|entry_id| self.entry_for_id(entry_id))
630 {
631 File {
632 is_local: local,
633 worktree_path: worktree_path.clone(),
634 entry_id: Some(entry.id),
635 mtime: entry.mtime,
636 path: entry.path.clone(),
637 worktree: worktree_handle.clone(),
638 }
639 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
640 File {
641 is_local: local,
642 worktree_path: worktree_path.clone(),
643 entry_id: Some(entry.id),
644 mtime: entry.mtime,
645 path: entry.path.clone(),
646 worktree: worktree_handle.clone(),
647 }
648 } else {
649 File {
650 is_local: local,
651 worktree_path: worktree_path.clone(),
652 entry_id: None,
653 path: old_file.path().clone(),
654 mtime: old_file.mtime(),
655 worktree: worktree_handle.clone(),
656 }
657 };
658
659 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
660 task.detach();
661 }
662 }
663 });
664 } else {
665 buffers_to_delete.push(*buffer_id);
666 }
667 }
668
669 for buffer_id in buffers_to_delete {
670 match self {
671 Self::Local(worktree) => {
672 worktree.open_buffers.remove(&buffer_id);
673 }
674 Self::Remote(worktree) => {
675 worktree.open_buffers.remove(&buffer_id);
676 }
677 }
678 }
679 }
680
681 fn update_diagnostics(
682 &mut self,
683 params: lsp::PublishDiagnosticsParams,
684 cx: &mut ModelContext<Worktree>,
685 ) -> Result<()> {
686 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
687 let file_path = params
688 .uri
689 .to_file_path()
690 .map_err(|_| anyhow!("URI is not a file"))?
691 .strip_prefix(&this.abs_path)
692 .context("path is not within worktree")?
693 .to_owned();
694
695 for buffer in this.open_buffers.values() {
696 if let Some(buffer) = buffer.upgrade(cx) {
697 if buffer
698 .read(cx)
699 .file()
700 .map_or(false, |file| file.path().as_ref() == file_path)
701 {
702 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
703 (
704 buffer.remote_id(),
705 buffer.update_diagnostics(params.version, params.diagnostics, cx),
706 )
707 });
708 self.send_buffer_update(remote_id, operation?, cx);
709 return Ok(());
710 }
711 }
712 }
713
714 this.diagnostics.insert(file_path, params.diagnostics);
715 Ok(())
716 }
717
718 fn send_buffer_update(
719 &mut self,
720 buffer_id: u64,
721 operation: Operation,
722 cx: &mut ModelContext<Self>,
723 ) {
724 if let Some((rpc, remote_id)) = match self {
725 Worktree::Local(worktree) => worktree
726 .remote_id
727 .borrow()
728 .map(|id| (worktree.rpc.clone(), id)),
729 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
730 } {
731 cx.spawn(|worktree, mut cx| async move {
732 if let Err(error) = rpc
733 .request(proto::UpdateBuffer {
734 worktree_id: remote_id,
735 buffer_id,
736 operations: vec![language::proto::serialize_operation(&operation)],
737 })
738 .await
739 {
740 worktree.update(&mut cx, |worktree, _| {
741 log::error!("error sending buffer operation: {}", error);
742 match worktree {
743 Worktree::Local(t) => &mut t.queued_operations,
744 Worktree::Remote(t) => &mut t.queued_operations,
745 }
746 .push((buffer_id, operation));
747 });
748 }
749 })
750 .detach();
751 }
752 }
753}
754
755impl Deref for Worktree {
756 type Target = Snapshot;
757
758 fn deref(&self) -> &Self::Target {
759 match self {
760 Worktree::Local(worktree) => &worktree.snapshot,
761 Worktree::Remote(worktree) => &worktree.snapshot,
762 }
763 }
764}
765
766pub struct LocalWorktree {
767 snapshot: Snapshot,
768 config: WorktreeConfig,
769 background_snapshot: Arc<Mutex<Snapshot>>,
770 last_scan_state_rx: watch::Receiver<ScanState>,
771 _background_scanner_task: Option<Task<()>>,
772 _maintain_remote_id_task: Task<Option<()>>,
773 poll_task: Option<Task<()>>,
774 remote_id: watch::Receiver<Option<u64>>,
775 share: Option<ShareState>,
776 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
777 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
778 diagnostics: HashMap<PathBuf, Vec<lsp::Diagnostic>>,
779 peers: HashMap<PeerId, ReplicaId>,
780 queued_operations: Vec<(u64, Operation)>,
781 languages: Arc<LanguageRegistry>,
782 rpc: Arc<Client>,
783 fs: Arc<dyn Fs>,
784 language_server: Option<Arc<LanguageServer>>,
785}
786
787#[derive(Default, Deserialize)]
788struct WorktreeConfig {
789 collaborators: Vec<String>,
790}
791
792impl LocalWorktree {
793 async fn new(
794 rpc: Arc<Client>,
795 path: impl Into<Arc<Path>>,
796 fs: Arc<dyn Fs>,
797 languages: Arc<LanguageRegistry>,
798 language_server: Option<Arc<LanguageServer>>,
799 cx: &mut AsyncAppContext,
800 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
801 let abs_path = path.into();
802 let path: Arc<Path> = Arc::from(Path::new(""));
803 let next_entry_id = AtomicUsize::new(0);
804
805 // After determining whether the root entry is a file or a directory, populate the
806 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
807 let root_name = abs_path
808 .file_name()
809 .map_or(String::new(), |f| f.to_string_lossy().to_string());
810 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
811 let metadata = fs.metadata(&abs_path).await?;
812
813 let mut config = WorktreeConfig::default();
814 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
815 if let Ok(parsed) = toml::from_str(&zed_toml) {
816 config = parsed;
817 }
818 }
819
820 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
821 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
822 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
823 let mut snapshot = Snapshot {
824 id: cx.model_id(),
825 scan_id: 0,
826 abs_path,
827 root_name: root_name.clone(),
828 root_char_bag,
829 ignores: Default::default(),
830 entries_by_path: Default::default(),
831 entries_by_id: Default::default(),
832 removed_entry_ids: Default::default(),
833 next_entry_id: Arc::new(next_entry_id),
834 };
835 if let Some(metadata) = metadata {
836 snapshot.insert_entry(
837 Entry::new(
838 path.into(),
839 &metadata,
840 &snapshot.next_entry_id,
841 snapshot.root_char_bag,
842 ),
843 fs.as_ref(),
844 );
845 }
846
847 let (mut remote_id_tx, remote_id_rx) = watch::channel();
848 let _maintain_remote_id_task = cx.spawn_weak({
849 let rpc = rpc.clone();
850 move |this, cx| {
851 async move {
852 let mut status = rpc.status();
853 while let Some(status) = status.recv().await {
854 if let Some(this) = this.upgrade(&cx) {
855 let remote_id = if let client::Status::Connected { .. } = status {
856 let collaborator_logins = this.read_with(&cx, |this, _| {
857 this.as_local().unwrap().config.collaborators.clone()
858 });
859 let response = rpc
860 .request(proto::OpenWorktree {
861 root_name: root_name.clone(),
862 collaborator_logins,
863 })
864 .await?;
865
866 Some(response.worktree_id)
867 } else {
868 None
869 };
870 if remote_id_tx.send(remote_id).await.is_err() {
871 break;
872 }
873 }
874 }
875 Ok(())
876 }
877 .log_err()
878 }
879 });
880
881 let tree = Self {
882 snapshot: snapshot.clone(),
883 config,
884 remote_id: remote_id_rx,
885 background_snapshot: Arc::new(Mutex::new(snapshot)),
886 last_scan_state_rx,
887 _background_scanner_task: None,
888 _maintain_remote_id_task,
889 share: None,
890 poll_task: None,
891 open_buffers: Default::default(),
892 shared_buffers: Default::default(),
893 diagnostics: Default::default(),
894 queued_operations: Default::default(),
895 peers: Default::default(),
896 languages,
897 rpc,
898 fs,
899 language_server,
900 };
901
902 cx.spawn_weak(|this, mut cx| async move {
903 while let Ok(scan_state) = scan_states_rx.recv().await {
904 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
905 let to_send = handle.update(&mut cx, |this, cx| {
906 last_scan_state_tx.blocking_send(scan_state).ok();
907 this.poll_snapshot(cx);
908 let tree = this.as_local_mut().unwrap();
909 if !tree.is_scanning() {
910 if let Some(share) = tree.share.as_ref() {
911 return Some((tree.snapshot(), share.snapshots_tx.clone()));
912 }
913 }
914 None
915 });
916
917 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
918 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
919 log::error!("error submitting snapshot to send {}", err);
920 }
921 }
922 } else {
923 break;
924 }
925 }
926 })
927 .detach();
928
929 if let Some(language_server) = &tree.language_server {
930 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
931 language_server
932 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
933 smol::block_on(diagnostics_tx.send(params)).ok();
934 })
935 .detach();
936 cx.spawn_weak(|this, mut cx| async move {
937 while let Ok(diagnostics) = diagnostics_rx.recv().await {
938 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
939 handle.update(&mut cx, |this, cx| {
940 this.update_diagnostics(diagnostics, cx).log_err();
941 });
942 } else {
943 break;
944 }
945 }
946 })
947 .detach();
948 }
949
950 Worktree::Local(tree)
951 });
952
953 Ok((tree, scan_states_tx))
954 }
955
956 pub fn open_buffer(
957 &mut self,
958 path: &Path,
959 cx: &mut ModelContext<Worktree>,
960 ) -> Task<Result<ModelHandle<Buffer>>> {
961 let handle = cx.handle();
962
963 // If there is already a buffer for the given path, then return it.
964 let mut existing_buffer = None;
965 self.open_buffers.retain(|_buffer_id, buffer| {
966 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
967 if let Some(file) = buffer.read(cx.as_ref()).file() {
968 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
969 existing_buffer = Some(buffer);
970 }
971 }
972 true
973 } else {
974 false
975 }
976 });
977
978 let path = Arc::from(path);
979 let language_server = self.language_server.clone();
980 cx.spawn(|this, mut cx| async move {
981 if let Some(existing_buffer) = existing_buffer {
982 Ok(existing_buffer)
983 } else {
984 let (file, contents) = this
985 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
986 .await?;
987 let language = this.read_with(&cx, |this, _| {
988 use language::File;
989 this.languages().select_language(file.full_path()).cloned()
990 });
991 let diagnostics = this.update(&mut cx, |this, _| {
992 this.as_local_mut()
993 .unwrap()
994 .diagnostics
995 .remove(path.as_ref())
996 });
997 let buffer = cx.add_model(|cx| {
998 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
999 buffer.set_language(language, language_server, cx);
1000 if let Some(diagnostics) = diagnostics {
1001 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1002 }
1003 buffer
1004 });
1005 this.update(&mut cx, |this, _| {
1006 let this = this
1007 .as_local_mut()
1008 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1009
1010 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1011 Ok(buffer)
1012 })
1013 }
1014 })
1015 }
1016
1017 pub fn open_remote_buffer(
1018 &mut self,
1019 envelope: TypedEnvelope<proto::OpenBuffer>,
1020 cx: &mut ModelContext<Worktree>,
1021 ) -> Task<Result<proto::OpenBufferResponse>> {
1022 let peer_id = envelope.original_sender_id();
1023 let path = Path::new(&envelope.payload.path);
1024
1025 let buffer = self.open_buffer(path, cx);
1026
1027 cx.spawn(|this, mut cx| async move {
1028 let buffer = buffer.await?;
1029 this.update(&mut cx, |this, cx| {
1030 this.as_local_mut()
1031 .unwrap()
1032 .shared_buffers
1033 .entry(peer_id?)
1034 .or_default()
1035 .insert(buffer.id() as u64, buffer.clone());
1036
1037 Ok(proto::OpenBufferResponse {
1038 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1039 })
1040 })
1041 })
1042 }
1043
1044 pub fn close_remote_buffer(
1045 &mut self,
1046 envelope: TypedEnvelope<proto::CloseBuffer>,
1047 cx: &mut ModelContext<Worktree>,
1048 ) -> Result<()> {
1049 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1050 shared_buffers.remove(&envelope.payload.buffer_id);
1051 cx.notify();
1052 }
1053
1054 Ok(())
1055 }
1056
1057 pub fn add_peer(
1058 &mut self,
1059 envelope: TypedEnvelope<proto::AddPeer>,
1060 cx: &mut ModelContext<Worktree>,
1061 ) -> Result<()> {
1062 let peer = envelope
1063 .payload
1064 .peer
1065 .as_ref()
1066 .ok_or_else(|| anyhow!("empty peer"))?;
1067 self.peers
1068 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1069 cx.notify();
1070
1071 Ok(())
1072 }
1073
1074 pub fn remove_peer(
1075 &mut self,
1076 envelope: TypedEnvelope<proto::RemovePeer>,
1077 cx: &mut ModelContext<Worktree>,
1078 ) -> Result<()> {
1079 let peer_id = PeerId(envelope.payload.peer_id);
1080 let replica_id = self
1081 .peers
1082 .remove(&peer_id)
1083 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1084 self.shared_buffers.remove(&peer_id);
1085 for (_, buffer) in &self.open_buffers {
1086 if let Some(buffer) = buffer.upgrade(cx) {
1087 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1088 }
1089 }
1090 cx.notify();
1091
1092 Ok(())
1093 }
1094
1095 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1096 let mut scan_state_rx = self.last_scan_state_rx.clone();
1097 async move {
1098 let mut scan_state = Some(scan_state_rx.borrow().clone());
1099 while let Some(ScanState::Scanning) = scan_state {
1100 scan_state = scan_state_rx.recv().await;
1101 }
1102 }
1103 }
1104
1105 pub fn remote_id(&self) -> Option<u64> {
1106 *self.remote_id.borrow()
1107 }
1108
1109 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1110 let mut remote_id = self.remote_id.clone();
1111 async move {
1112 while let Some(remote_id) = remote_id.recv().await {
1113 if remote_id.is_some() {
1114 return remote_id;
1115 }
1116 }
1117 None
1118 }
1119 }
1120
1121 fn is_scanning(&self) -> bool {
1122 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1123 true
1124 } else {
1125 false
1126 }
1127 }
1128
1129 pub fn snapshot(&self) -> Snapshot {
1130 self.snapshot.clone()
1131 }
1132
1133 pub fn abs_path(&self) -> &Path {
1134 self.snapshot.abs_path.as_ref()
1135 }
1136
1137 pub fn contains_abs_path(&self, path: &Path) -> bool {
1138 path.starts_with(&self.snapshot.abs_path)
1139 }
1140
1141 fn absolutize(&self, path: &Path) -> PathBuf {
1142 if path.file_name().is_some() {
1143 self.snapshot.abs_path.join(path)
1144 } else {
1145 self.snapshot.abs_path.to_path_buf()
1146 }
1147 }
1148
1149 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1150 let handle = cx.handle();
1151 let path = Arc::from(path);
1152 let worktree_path = self.abs_path.clone();
1153 let abs_path = self.absolutize(&path);
1154 let background_snapshot = self.background_snapshot.clone();
1155 let fs = self.fs.clone();
1156 cx.spawn(|this, mut cx| async move {
1157 let text = fs.load(&abs_path).await?;
1158 // Eagerly populate the snapshot with an updated entry for the loaded file
1159 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1160 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1161 Ok((
1162 File {
1163 entry_id: Some(entry.id),
1164 worktree: handle,
1165 worktree_path,
1166 path: entry.path,
1167 mtime: entry.mtime,
1168 is_local: true,
1169 },
1170 text,
1171 ))
1172 })
1173 }
1174
1175 pub fn save_buffer_as(
1176 &self,
1177 buffer: ModelHandle<Buffer>,
1178 path: impl Into<Arc<Path>>,
1179 text: Rope,
1180 cx: &mut ModelContext<Worktree>,
1181 ) -> Task<Result<File>> {
1182 let save = self.save(path, text, cx);
1183 cx.spawn(|this, mut cx| async move {
1184 let entry = save.await?;
1185 this.update(&mut cx, |this, cx| {
1186 let this = this.as_local_mut().unwrap();
1187 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1188 Ok(File {
1189 entry_id: Some(entry.id),
1190 worktree: cx.handle(),
1191 worktree_path: this.abs_path.clone(),
1192 path: entry.path,
1193 mtime: entry.mtime,
1194 is_local: true,
1195 })
1196 })
1197 })
1198 }
1199
1200 fn save(
1201 &self,
1202 path: impl Into<Arc<Path>>,
1203 text: Rope,
1204 cx: &mut ModelContext<Worktree>,
1205 ) -> Task<Result<Entry>> {
1206 let path = path.into();
1207 let abs_path = self.absolutize(&path);
1208 let background_snapshot = self.background_snapshot.clone();
1209 let fs = self.fs.clone();
1210 let save = cx.background().spawn(async move {
1211 fs.save(&abs_path, &text).await?;
1212 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1213 });
1214
1215 cx.spawn(|this, mut cx| async move {
1216 let entry = save.await?;
1217 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1218 Ok(entry)
1219 })
1220 }
1221
1222 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1223 let snapshot = self.snapshot();
1224 let share_request = self.share_request(cx);
1225 let rpc = self.rpc.clone();
1226 cx.spawn(|this, mut cx| async move {
1227 let share_request = if let Some(request) = share_request.await {
1228 request
1229 } else {
1230 return Err(anyhow!("failed to open worktree on the server"));
1231 };
1232
1233 let remote_id = share_request.worktree.as_ref().unwrap().id;
1234 let share_response = rpc.request(share_request).await?;
1235
1236 log::info!("sharing worktree {:?}", share_response);
1237 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1238 smol::channel::unbounded::<Snapshot>();
1239
1240 cx.background()
1241 .spawn({
1242 let rpc = rpc.clone();
1243 async move {
1244 let mut prev_snapshot = snapshot;
1245 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1246 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1247 match rpc.send(message).await {
1248 Ok(()) => prev_snapshot = snapshot,
1249 Err(err) => log::error!("error sending snapshot diff {}", err),
1250 }
1251 }
1252 }
1253 })
1254 .detach();
1255
1256 this.update(&mut cx, |worktree, cx| {
1257 let _subscriptions = vec![
1258 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
1259 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
1260 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1261 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1262 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1263 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1264 ];
1265
1266 let worktree = worktree.as_local_mut().unwrap();
1267 worktree.share = Some(ShareState {
1268 snapshots_tx: snapshots_to_send_tx,
1269 _subscriptions,
1270 });
1271 });
1272
1273 Ok(remote_id)
1274 })
1275 }
1276
1277 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1278 self.share.take();
1279 let rpc = self.rpc.clone();
1280 let remote_id = self.remote_id();
1281 cx.foreground()
1282 .spawn(
1283 async move {
1284 if let Some(worktree_id) = remote_id {
1285 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1286 }
1287 Ok(())
1288 }
1289 .log_err(),
1290 )
1291 .detach()
1292 }
1293
1294 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1295 let remote_id = self.next_remote_id();
1296 let snapshot = self.snapshot();
1297 let root_name = self.root_name.clone();
1298 cx.background().spawn(async move {
1299 remote_id.await.map(|id| {
1300 let entries = snapshot
1301 .entries_by_path
1302 .cursor::<()>()
1303 .filter(|e| !e.is_ignored)
1304 .map(Into::into)
1305 .collect();
1306 proto::ShareWorktree {
1307 worktree: Some(proto::Worktree {
1308 id,
1309 root_name,
1310 entries,
1311 }),
1312 }
1313 })
1314 })
1315 }
1316}
1317
1318fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1319 let contents = smol::block_on(fs.load(&abs_path))?;
1320 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1321 let mut builder = GitignoreBuilder::new(parent);
1322 for line in contents.lines() {
1323 builder.add_line(Some(abs_path.into()), line)?;
1324 }
1325 Ok(builder.build()?)
1326}
1327
1328impl Deref for LocalWorktree {
1329 type Target = Snapshot;
1330
1331 fn deref(&self) -> &Self::Target {
1332 &self.snapshot
1333 }
1334}
1335
1336impl fmt::Debug for LocalWorktree {
1337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1338 self.snapshot.fmt(f)
1339 }
1340}
1341
1342struct ShareState {
1343 snapshots_tx: Sender<Snapshot>,
1344 _subscriptions: Vec<client::Subscription>,
1345}
1346
1347pub struct RemoteWorktree {
1348 remote_id: u64,
1349 snapshot: Snapshot,
1350 snapshot_rx: watch::Receiver<Snapshot>,
1351 client: Arc<Client>,
1352 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1353 replica_id: ReplicaId,
1354 open_buffers: HashMap<usize, RemoteBuffer>,
1355 peers: HashMap<PeerId, ReplicaId>,
1356 languages: Arc<LanguageRegistry>,
1357 queued_operations: Vec<(u64, Operation)>,
1358 _subscriptions: Vec<client::Subscription>,
1359}
1360
1361impl RemoteWorktree {
1362 pub fn open_buffer(
1363 &mut self,
1364 path: &Path,
1365 cx: &mut ModelContext<Worktree>,
1366 ) -> Task<Result<ModelHandle<Buffer>>> {
1367 let mut existing_buffer = None;
1368 self.open_buffers.retain(|_buffer_id, buffer| {
1369 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1370 if let Some(file) = buffer.read(cx.as_ref()).file() {
1371 if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
1372 existing_buffer = Some(buffer);
1373 }
1374 }
1375 true
1376 } else {
1377 false
1378 }
1379 });
1380
1381 let rpc = self.client.clone();
1382 let replica_id = self.replica_id;
1383 let remote_worktree_id = self.remote_id;
1384 let root_path = self.snapshot.abs_path.clone();
1385 let path = path.to_string_lossy().to_string();
1386 cx.spawn_weak(|this, mut cx| async move {
1387 if let Some(existing_buffer) = existing_buffer {
1388 Ok(existing_buffer)
1389 } else {
1390 let entry = this
1391 .upgrade(&cx)
1392 .ok_or_else(|| anyhow!("worktree was closed"))?
1393 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1394 .ok_or_else(|| anyhow!("file does not exist"))?;
1395 let response = rpc
1396 .request(proto::OpenBuffer {
1397 worktree_id: remote_worktree_id as u64,
1398 path,
1399 })
1400 .await?;
1401
1402 let this = this
1403 .upgrade(&cx)
1404 .ok_or_else(|| anyhow!("worktree was closed"))?;
1405 let file = File {
1406 entry_id: Some(entry.id),
1407 worktree: this.clone(),
1408 worktree_path: root_path,
1409 path: entry.path,
1410 mtime: entry.mtime,
1411 is_local: false,
1412 };
1413 let language = this.read_with(&cx, |this, _| {
1414 use language::File;
1415 this.languages().select_language(file.full_path()).cloned()
1416 });
1417 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1418 let buffer_id = remote_buffer.id as usize;
1419 let buffer = cx.add_model(|cx| {
1420 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1421 .unwrap()
1422 .with_language(language, None, cx)
1423 });
1424 this.update(&mut cx, |this, cx| {
1425 let this = this.as_remote_mut().unwrap();
1426 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1427 .open_buffers
1428 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1429 {
1430 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1431 }
1432 Result::<_, anyhow::Error>::Ok(())
1433 })?;
1434 Ok(buffer)
1435 }
1436 })
1437 }
1438
1439 pub fn remote_id(&self) -> u64 {
1440 self.remote_id
1441 }
1442
1443 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1444 for (_, buffer) in self.open_buffers.drain() {
1445 if let RemoteBuffer::Loaded(buffer) = buffer {
1446 if let Some(buffer) = buffer.upgrade(cx) {
1447 buffer.update(cx, |buffer, cx| buffer.close(cx))
1448 }
1449 }
1450 }
1451 }
1452
1453 fn snapshot(&self) -> Snapshot {
1454 self.snapshot.clone()
1455 }
1456
1457 fn update_from_remote(
1458 &mut self,
1459 envelope: TypedEnvelope<proto::UpdateWorktree>,
1460 cx: &mut ModelContext<Worktree>,
1461 ) -> Result<()> {
1462 let mut tx = self.updates_tx.clone();
1463 let payload = envelope.payload.clone();
1464 cx.background()
1465 .spawn(async move {
1466 tx.send(payload).await.expect("receiver runs to completion");
1467 })
1468 .detach();
1469
1470 Ok(())
1471 }
1472
1473 pub fn add_peer(
1474 &mut self,
1475 envelope: TypedEnvelope<proto::AddPeer>,
1476 cx: &mut ModelContext<Worktree>,
1477 ) -> Result<()> {
1478 let peer = envelope
1479 .payload
1480 .peer
1481 .as_ref()
1482 .ok_or_else(|| anyhow!("empty peer"))?;
1483 self.peers
1484 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1485 cx.notify();
1486 Ok(())
1487 }
1488
1489 pub fn remove_peer(
1490 &mut self,
1491 envelope: TypedEnvelope<proto::RemovePeer>,
1492 cx: &mut ModelContext<Worktree>,
1493 ) -> Result<()> {
1494 let peer_id = PeerId(envelope.payload.peer_id);
1495 let replica_id = self
1496 .peers
1497 .remove(&peer_id)
1498 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1499 for (_, buffer) in &self.open_buffers {
1500 if let Some(buffer) = buffer.upgrade(cx) {
1501 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1502 }
1503 }
1504 cx.notify();
1505 Ok(())
1506 }
1507}
1508
1509enum RemoteBuffer {
1510 Operations(Vec<Operation>),
1511 Loaded(WeakModelHandle<Buffer>),
1512}
1513
1514impl RemoteBuffer {
1515 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1516 match self {
1517 Self::Operations(_) => None,
1518 Self::Loaded(buffer) => buffer.upgrade(cx),
1519 }
1520 }
1521}
1522
1523#[derive(Clone)]
1524pub struct Snapshot {
1525 id: usize,
1526 scan_id: usize,
1527 abs_path: Arc<Path>,
1528 root_name: String,
1529 root_char_bag: CharBag,
1530 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1531 entries_by_path: SumTree<Entry>,
1532 entries_by_id: SumTree<PathEntry>,
1533 removed_entry_ids: HashMap<u64, usize>,
1534 next_entry_id: Arc<AtomicUsize>,
1535}
1536
1537impl Snapshot {
1538 pub fn id(&self) -> usize {
1539 self.id
1540 }
1541
1542 pub fn build_update(
1543 &self,
1544 other: &Self,
1545 worktree_id: u64,
1546 include_ignored: bool,
1547 ) -> proto::UpdateWorktree {
1548 let mut updated_entries = Vec::new();
1549 let mut removed_entries = Vec::new();
1550 let mut self_entries = self
1551 .entries_by_id
1552 .cursor::<()>()
1553 .filter(|e| include_ignored || !e.is_ignored)
1554 .peekable();
1555 let mut other_entries = other
1556 .entries_by_id
1557 .cursor::<()>()
1558 .filter(|e| include_ignored || !e.is_ignored)
1559 .peekable();
1560 loop {
1561 match (self_entries.peek(), other_entries.peek()) {
1562 (Some(self_entry), Some(other_entry)) => {
1563 match Ord::cmp(&self_entry.id, &other_entry.id) {
1564 Ordering::Less => {
1565 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1566 updated_entries.push(entry);
1567 self_entries.next();
1568 }
1569 Ordering::Equal => {
1570 if self_entry.scan_id != other_entry.scan_id {
1571 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1572 updated_entries.push(entry);
1573 }
1574
1575 self_entries.next();
1576 other_entries.next();
1577 }
1578 Ordering::Greater => {
1579 removed_entries.push(other_entry.id as u64);
1580 other_entries.next();
1581 }
1582 }
1583 }
1584 (Some(self_entry), None) => {
1585 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1586 updated_entries.push(entry);
1587 self_entries.next();
1588 }
1589 (None, Some(other_entry)) => {
1590 removed_entries.push(other_entry.id as u64);
1591 other_entries.next();
1592 }
1593 (None, None) => break,
1594 }
1595 }
1596
1597 proto::UpdateWorktree {
1598 updated_entries,
1599 removed_entries,
1600 worktree_id,
1601 }
1602 }
1603
1604 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1605 self.scan_id += 1;
1606 let scan_id = self.scan_id;
1607
1608 let mut entries_by_path_edits = Vec::new();
1609 let mut entries_by_id_edits = Vec::new();
1610 for entry_id in update.removed_entries {
1611 let entry_id = entry_id as usize;
1612 let entry = self
1613 .entry_for_id(entry_id)
1614 .ok_or_else(|| anyhow!("unknown entry"))?;
1615 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1616 entries_by_id_edits.push(Edit::Remove(entry.id));
1617 }
1618
1619 for entry in update.updated_entries {
1620 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1621 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1622 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1623 }
1624 entries_by_id_edits.push(Edit::Insert(PathEntry {
1625 id: entry.id,
1626 path: entry.path.clone(),
1627 is_ignored: entry.is_ignored,
1628 scan_id,
1629 }));
1630 entries_by_path_edits.push(Edit::Insert(entry));
1631 }
1632
1633 self.entries_by_path.edit(entries_by_path_edits, &());
1634 self.entries_by_id.edit(entries_by_id_edits, &());
1635
1636 Ok(())
1637 }
1638
1639 pub fn file_count(&self) -> usize {
1640 self.entries_by_path.summary().file_count
1641 }
1642
1643 pub fn visible_file_count(&self) -> usize {
1644 self.entries_by_path.summary().visible_file_count
1645 }
1646
1647 fn traverse_from_offset(
1648 &self,
1649 include_dirs: bool,
1650 include_ignored: bool,
1651 start_offset: usize,
1652 ) -> Traversal {
1653 let mut cursor = self.entries_by_path.cursor();
1654 cursor.seek(
1655 &TraversalTarget::Count {
1656 count: start_offset,
1657 include_dirs,
1658 include_ignored,
1659 },
1660 Bias::Right,
1661 &(),
1662 );
1663 Traversal {
1664 cursor,
1665 include_dirs,
1666 include_ignored,
1667 }
1668 }
1669
1670 fn traverse_from_path(
1671 &self,
1672 include_dirs: bool,
1673 include_ignored: bool,
1674 path: &Path,
1675 ) -> Traversal {
1676 let mut cursor = self.entries_by_path.cursor();
1677 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1678 Traversal {
1679 cursor,
1680 include_dirs,
1681 include_ignored,
1682 }
1683 }
1684
1685 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1686 self.traverse_from_offset(false, include_ignored, start)
1687 }
1688
1689 pub fn entries(&self, include_ignored: bool) -> Traversal {
1690 self.traverse_from_offset(true, include_ignored, 0)
1691 }
1692
1693 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1694 let empty_path = Path::new("");
1695 self.entries_by_path
1696 .cursor::<()>()
1697 .filter(move |entry| entry.path.as_ref() != empty_path)
1698 .map(|entry| &entry.path)
1699 }
1700
1701 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1702 let mut cursor = self.entries_by_path.cursor();
1703 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1704 let traversal = Traversal {
1705 cursor,
1706 include_dirs: true,
1707 include_ignored: true,
1708 };
1709 ChildEntriesIter {
1710 traversal,
1711 parent_path,
1712 }
1713 }
1714
1715 pub fn root_entry(&self) -> Option<&Entry> {
1716 self.entry_for_path("")
1717 }
1718
1719 pub fn root_name(&self) -> &str {
1720 &self.root_name
1721 }
1722
1723 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1724 let path = path.as_ref();
1725 self.traverse_from_path(true, true, path)
1726 .entry()
1727 .and_then(|entry| {
1728 if entry.path.as_ref() == path {
1729 Some(entry)
1730 } else {
1731 None
1732 }
1733 })
1734 }
1735
1736 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1737 let entry = self.entries_by_id.get(&id, &())?;
1738 self.entry_for_path(&entry.path)
1739 }
1740
1741 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1742 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1743 }
1744
1745 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1746 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1747 let abs_path = self.abs_path.join(&entry.path);
1748 match build_gitignore(&abs_path, fs) {
1749 Ok(ignore) => {
1750 let ignore_dir_path = entry.path.parent().unwrap();
1751 self.ignores
1752 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1753 }
1754 Err(error) => {
1755 log::error!(
1756 "error loading .gitignore file {:?} - {:?}",
1757 &entry.path,
1758 error
1759 );
1760 }
1761 }
1762 }
1763
1764 self.reuse_entry_id(&mut entry);
1765 self.entries_by_path.insert_or_replace(entry.clone(), &());
1766 self.entries_by_id.insert_or_replace(
1767 PathEntry {
1768 id: entry.id,
1769 path: entry.path.clone(),
1770 is_ignored: entry.is_ignored,
1771 scan_id: self.scan_id,
1772 },
1773 &(),
1774 );
1775 entry
1776 }
1777
1778 fn populate_dir(
1779 &mut self,
1780 parent_path: Arc<Path>,
1781 entries: impl IntoIterator<Item = Entry>,
1782 ignore: Option<Arc<Gitignore>>,
1783 ) {
1784 let mut parent_entry = self
1785 .entries_by_path
1786 .get(&PathKey(parent_path.clone()), &())
1787 .unwrap()
1788 .clone();
1789 if let Some(ignore) = ignore {
1790 self.ignores.insert(parent_path, (ignore, self.scan_id));
1791 }
1792 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1793 parent_entry.kind = EntryKind::Dir;
1794 } else {
1795 unreachable!();
1796 }
1797
1798 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1799 let mut entries_by_id_edits = Vec::new();
1800
1801 for mut entry in entries {
1802 self.reuse_entry_id(&mut entry);
1803 entries_by_id_edits.push(Edit::Insert(PathEntry {
1804 id: entry.id,
1805 path: entry.path.clone(),
1806 is_ignored: entry.is_ignored,
1807 scan_id: self.scan_id,
1808 }));
1809 entries_by_path_edits.push(Edit::Insert(entry));
1810 }
1811
1812 self.entries_by_path.edit(entries_by_path_edits, &());
1813 self.entries_by_id.edit(entries_by_id_edits, &());
1814 }
1815
1816 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1817 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1818 entry.id = removed_entry_id;
1819 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1820 entry.id = existing_entry.id;
1821 }
1822 }
1823
1824 fn remove_path(&mut self, path: &Path) {
1825 let mut new_entries;
1826 let removed_entries;
1827 {
1828 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
1829 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
1830 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
1831 new_entries.push_tree(cursor.suffix(&()), &());
1832 }
1833 self.entries_by_path = new_entries;
1834
1835 let mut entries_by_id_edits = Vec::new();
1836 for entry in removed_entries.cursor::<()>() {
1837 let removed_entry_id = self
1838 .removed_entry_ids
1839 .entry(entry.inode)
1840 .or_insert(entry.id);
1841 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1842 entries_by_id_edits.push(Edit::Remove(entry.id));
1843 }
1844 self.entries_by_id.edit(entries_by_id_edits, &());
1845
1846 if path.file_name() == Some(&GITIGNORE) {
1847 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1848 *scan_id = self.scan_id;
1849 }
1850 }
1851 }
1852
1853 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1854 let mut new_ignores = Vec::new();
1855 for ancestor in path.ancestors().skip(1) {
1856 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1857 new_ignores.push((ancestor, Some(ignore.clone())));
1858 } else {
1859 new_ignores.push((ancestor, None));
1860 }
1861 }
1862
1863 let mut ignore_stack = IgnoreStack::none();
1864 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1865 if ignore_stack.is_path_ignored(&parent_path, true) {
1866 ignore_stack = IgnoreStack::all();
1867 break;
1868 } else if let Some(ignore) = ignore {
1869 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1870 }
1871 }
1872
1873 if ignore_stack.is_path_ignored(path, is_dir) {
1874 ignore_stack = IgnoreStack::all();
1875 }
1876
1877 ignore_stack
1878 }
1879}
1880
1881impl fmt::Debug for Snapshot {
1882 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1883 for entry in self.entries_by_path.cursor::<()>() {
1884 for _ in entry.path.ancestors().skip(1) {
1885 write!(f, " ")?;
1886 }
1887 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1888 }
1889 Ok(())
1890 }
1891}
1892
1893#[derive(Clone, PartialEq)]
1894pub struct File {
1895 entry_id: Option<usize>,
1896 worktree: ModelHandle<Worktree>,
1897 worktree_path: Arc<Path>,
1898 pub path: Arc<Path>,
1899 pub mtime: SystemTime,
1900 is_local: bool,
1901}
1902
1903impl language::File for File {
1904 fn worktree_id(&self) -> usize {
1905 self.worktree.id()
1906 }
1907
1908 fn entry_id(&self) -> Option<usize> {
1909 self.entry_id
1910 }
1911
1912 fn mtime(&self) -> SystemTime {
1913 self.mtime
1914 }
1915
1916 fn path(&self) -> &Arc<Path> {
1917 &self.path
1918 }
1919
1920 fn abs_path(&self) -> Option<PathBuf> {
1921 if self.is_local {
1922 Some(self.worktree_path.join(&self.path))
1923 } else {
1924 None
1925 }
1926 }
1927
1928 fn full_path(&self) -> PathBuf {
1929 let mut full_path = PathBuf::new();
1930 if let Some(worktree_name) = self.worktree_path.file_name() {
1931 full_path.push(worktree_name);
1932 }
1933 full_path.push(&self.path);
1934 full_path
1935 }
1936
1937 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1938 /// of its worktree, then this method will return the name of the worktree itself.
1939 fn file_name<'a>(&'a self) -> Option<OsString> {
1940 self.path
1941 .file_name()
1942 .or_else(|| self.worktree_path.file_name())
1943 .map(Into::into)
1944 }
1945
1946 fn is_deleted(&self) -> bool {
1947 self.entry_id.is_none()
1948 }
1949
1950 fn save(
1951 &self,
1952 buffer_id: u64,
1953 text: Rope,
1954 version: clock::Global,
1955 cx: &mut MutableAppContext,
1956 ) -> Task<Result<(clock::Global, SystemTime)>> {
1957 self.worktree.update(cx, |worktree, cx| match worktree {
1958 Worktree::Local(worktree) => {
1959 let rpc = worktree.rpc.clone();
1960 let worktree_id = *worktree.remote_id.borrow();
1961 let save = worktree.save(self.path.clone(), text, cx);
1962 cx.background().spawn(async move {
1963 let entry = save.await?;
1964 if let Some(worktree_id) = worktree_id {
1965 rpc.send(proto::BufferSaved {
1966 worktree_id,
1967 buffer_id,
1968 version: (&version).into(),
1969 mtime: Some(entry.mtime.into()),
1970 })
1971 .await?;
1972 }
1973 Ok((version, entry.mtime))
1974 })
1975 }
1976 Worktree::Remote(worktree) => {
1977 let rpc = worktree.client.clone();
1978 let worktree_id = worktree.remote_id;
1979 cx.foreground().spawn(async move {
1980 let response = rpc
1981 .request(proto::SaveBuffer {
1982 worktree_id,
1983 buffer_id,
1984 })
1985 .await?;
1986 let version = response.version.try_into()?;
1987 let mtime = response
1988 .mtime
1989 .ok_or_else(|| anyhow!("missing mtime"))?
1990 .into();
1991 Ok((version, mtime))
1992 })
1993 }
1994 })
1995 }
1996
1997 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
1998 let worktree = self.worktree.read(cx).as_local()?;
1999 let abs_path = worktree.absolutize(&self.path);
2000 let fs = worktree.fs.clone();
2001 Some(
2002 cx.background()
2003 .spawn(async move { fs.load(&abs_path).await }),
2004 )
2005 }
2006
2007 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2008 self.worktree.update(cx, |worktree, cx| {
2009 worktree.send_buffer_update(buffer_id, operation, cx);
2010 });
2011 }
2012
2013 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2014 self.worktree.update(cx, |worktree, cx| {
2015 if let Worktree::Remote(worktree) = worktree {
2016 let worktree_id = worktree.remote_id;
2017 let rpc = worktree.client.clone();
2018 cx.background()
2019 .spawn(async move {
2020 if let Err(error) = rpc
2021 .send(proto::CloseBuffer {
2022 worktree_id,
2023 buffer_id,
2024 })
2025 .await
2026 {
2027 log::error!("error closing remote buffer: {}", error);
2028 }
2029 })
2030 .detach();
2031 }
2032 });
2033 }
2034
2035 fn boxed_clone(&self) -> Box<dyn language::File> {
2036 Box::new(self.clone())
2037 }
2038
2039 fn as_any(&self) -> &dyn Any {
2040 self
2041 }
2042}
2043
2044#[derive(Clone, Debug)]
2045pub struct Entry {
2046 pub id: usize,
2047 pub kind: EntryKind,
2048 pub path: Arc<Path>,
2049 pub inode: u64,
2050 pub mtime: SystemTime,
2051 pub is_symlink: bool,
2052 pub is_ignored: bool,
2053}
2054
2055#[derive(Clone, Debug)]
2056pub enum EntryKind {
2057 PendingDir,
2058 Dir,
2059 File(CharBag),
2060}
2061
2062impl Entry {
2063 fn new(
2064 path: Arc<Path>,
2065 metadata: &fs::Metadata,
2066 next_entry_id: &AtomicUsize,
2067 root_char_bag: CharBag,
2068 ) -> Self {
2069 Self {
2070 id: next_entry_id.fetch_add(1, SeqCst),
2071 kind: if metadata.is_dir {
2072 EntryKind::PendingDir
2073 } else {
2074 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2075 },
2076 path,
2077 inode: metadata.inode,
2078 mtime: metadata.mtime,
2079 is_symlink: metadata.is_symlink,
2080 is_ignored: false,
2081 }
2082 }
2083
2084 pub fn is_dir(&self) -> bool {
2085 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2086 }
2087
2088 pub fn is_file(&self) -> bool {
2089 matches!(self.kind, EntryKind::File(_))
2090 }
2091}
2092
2093impl sum_tree::Item for Entry {
2094 type Summary = EntrySummary;
2095
2096 fn summary(&self) -> Self::Summary {
2097 let visible_count = if self.is_ignored { 0 } else { 1 };
2098 let file_count;
2099 let visible_file_count;
2100 if self.is_file() {
2101 file_count = 1;
2102 visible_file_count = visible_count;
2103 } else {
2104 file_count = 0;
2105 visible_file_count = 0;
2106 }
2107
2108 EntrySummary {
2109 max_path: self.path.clone(),
2110 count: 1,
2111 visible_count,
2112 file_count,
2113 visible_file_count,
2114 }
2115 }
2116}
2117
2118impl sum_tree::KeyedItem for Entry {
2119 type Key = PathKey;
2120
2121 fn key(&self) -> Self::Key {
2122 PathKey(self.path.clone())
2123 }
2124}
2125
2126#[derive(Clone, Debug)]
2127pub struct EntrySummary {
2128 max_path: Arc<Path>,
2129 count: usize,
2130 visible_count: usize,
2131 file_count: usize,
2132 visible_file_count: usize,
2133}
2134
2135impl Default for EntrySummary {
2136 fn default() -> Self {
2137 Self {
2138 max_path: Arc::from(Path::new("")),
2139 count: 0,
2140 visible_count: 0,
2141 file_count: 0,
2142 visible_file_count: 0,
2143 }
2144 }
2145}
2146
2147impl sum_tree::Summary for EntrySummary {
2148 type Context = ();
2149
2150 fn add_summary(&mut self, rhs: &Self, _: &()) {
2151 self.max_path = rhs.max_path.clone();
2152 self.visible_count += rhs.visible_count;
2153 self.file_count += rhs.file_count;
2154 self.visible_file_count += rhs.visible_file_count;
2155 }
2156}
2157
2158#[derive(Clone, Debug)]
2159struct PathEntry {
2160 id: usize,
2161 path: Arc<Path>,
2162 is_ignored: bool,
2163 scan_id: usize,
2164}
2165
2166impl sum_tree::Item for PathEntry {
2167 type Summary = PathEntrySummary;
2168
2169 fn summary(&self) -> Self::Summary {
2170 PathEntrySummary { max_id: self.id }
2171 }
2172}
2173
2174impl sum_tree::KeyedItem for PathEntry {
2175 type Key = usize;
2176
2177 fn key(&self) -> Self::Key {
2178 self.id
2179 }
2180}
2181
2182#[derive(Clone, Debug, Default)]
2183struct PathEntrySummary {
2184 max_id: usize,
2185}
2186
2187impl sum_tree::Summary for PathEntrySummary {
2188 type Context = ();
2189
2190 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2191 self.max_id = summary.max_id;
2192 }
2193}
2194
2195impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2196 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2197 *self = summary.max_id;
2198 }
2199}
2200
2201#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2202pub struct PathKey(Arc<Path>);
2203
2204impl Default for PathKey {
2205 fn default() -> Self {
2206 Self(Path::new("").into())
2207 }
2208}
2209
2210impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2211 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2212 self.0 = summary.max_path.clone();
2213 }
2214}
2215
2216struct BackgroundScanner {
2217 fs: Arc<dyn Fs>,
2218 snapshot: Arc<Mutex<Snapshot>>,
2219 notify: Sender<ScanState>,
2220 executor: Arc<executor::Background>,
2221}
2222
2223impl BackgroundScanner {
2224 fn new(
2225 snapshot: Arc<Mutex<Snapshot>>,
2226 notify: Sender<ScanState>,
2227 fs: Arc<dyn Fs>,
2228 executor: Arc<executor::Background>,
2229 ) -> Self {
2230 Self {
2231 fs,
2232 snapshot,
2233 notify,
2234 executor,
2235 }
2236 }
2237
2238 fn abs_path(&self) -> Arc<Path> {
2239 self.snapshot.lock().abs_path.clone()
2240 }
2241
2242 fn snapshot(&self) -> Snapshot {
2243 self.snapshot.lock().clone()
2244 }
2245
2246 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2247 if self.notify.send(ScanState::Scanning).await.is_err() {
2248 return;
2249 }
2250
2251 if let Err(err) = self.scan_dirs().await {
2252 if self
2253 .notify
2254 .send(ScanState::Err(Arc::new(err)))
2255 .await
2256 .is_err()
2257 {
2258 return;
2259 }
2260 }
2261
2262 if self.notify.send(ScanState::Idle).await.is_err() {
2263 return;
2264 }
2265
2266 futures::pin_mut!(events_rx);
2267 while let Some(events) = events_rx.next().await {
2268 if self.notify.send(ScanState::Scanning).await.is_err() {
2269 break;
2270 }
2271
2272 if !self.process_events(events).await {
2273 break;
2274 }
2275
2276 if self.notify.send(ScanState::Idle).await.is_err() {
2277 break;
2278 }
2279 }
2280 }
2281
2282 async fn scan_dirs(&mut self) -> Result<()> {
2283 let root_char_bag;
2284 let next_entry_id;
2285 let is_dir;
2286 {
2287 let snapshot = self.snapshot.lock();
2288 root_char_bag = snapshot.root_char_bag;
2289 next_entry_id = snapshot.next_entry_id.clone();
2290 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2291 };
2292
2293 if is_dir {
2294 let path: Arc<Path> = Arc::from(Path::new(""));
2295 let abs_path = self.abs_path();
2296 let (tx, rx) = channel::unbounded();
2297 tx.send(ScanJob {
2298 abs_path: abs_path.to_path_buf(),
2299 path,
2300 ignore_stack: IgnoreStack::none(),
2301 scan_queue: tx.clone(),
2302 })
2303 .await
2304 .unwrap();
2305 drop(tx);
2306
2307 self.executor
2308 .scoped(|scope| {
2309 for _ in 0..self.executor.num_cpus() {
2310 scope.spawn(async {
2311 while let Ok(job) = rx.recv().await {
2312 if let Err(err) = self
2313 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2314 .await
2315 {
2316 log::error!("error scanning {:?}: {}", job.abs_path, err);
2317 }
2318 }
2319 });
2320 }
2321 })
2322 .await;
2323 }
2324
2325 Ok(())
2326 }
2327
2328 async fn scan_dir(
2329 &self,
2330 root_char_bag: CharBag,
2331 next_entry_id: Arc<AtomicUsize>,
2332 job: &ScanJob,
2333 ) -> Result<()> {
2334 let mut new_entries: Vec<Entry> = Vec::new();
2335 let mut new_jobs: Vec<ScanJob> = Vec::new();
2336 let mut ignore_stack = job.ignore_stack.clone();
2337 let mut new_ignore = None;
2338
2339 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2340 while let Some(child_abs_path) = child_paths.next().await {
2341 let child_abs_path = match child_abs_path {
2342 Ok(child_abs_path) => child_abs_path,
2343 Err(error) => {
2344 log::error!("error processing entry {:?}", error);
2345 continue;
2346 }
2347 };
2348 let child_name = child_abs_path.file_name().unwrap();
2349 let child_path: Arc<Path> = job.path.join(child_name).into();
2350 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2351 Some(metadata) => metadata,
2352 None => continue,
2353 };
2354
2355 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2356 if child_name == *GITIGNORE {
2357 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2358 Ok(ignore) => {
2359 let ignore = Arc::new(ignore);
2360 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2361 new_ignore = Some(ignore);
2362 }
2363 Err(error) => {
2364 log::error!(
2365 "error loading .gitignore file {:?} - {:?}",
2366 child_name,
2367 error
2368 );
2369 }
2370 }
2371
2372 // Update ignore status of any child entries we've already processed to reflect the
2373 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2374 // there should rarely be too numerous. Update the ignore stack associated with any
2375 // new jobs as well.
2376 let mut new_jobs = new_jobs.iter_mut();
2377 for entry in &mut new_entries {
2378 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2379 if entry.is_dir() {
2380 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2381 IgnoreStack::all()
2382 } else {
2383 ignore_stack.clone()
2384 };
2385 }
2386 }
2387 }
2388
2389 let mut child_entry = Entry::new(
2390 child_path.clone(),
2391 &child_metadata,
2392 &next_entry_id,
2393 root_char_bag,
2394 );
2395
2396 if child_metadata.is_dir {
2397 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2398 child_entry.is_ignored = is_ignored;
2399 new_entries.push(child_entry);
2400 new_jobs.push(ScanJob {
2401 abs_path: child_abs_path,
2402 path: child_path,
2403 ignore_stack: if is_ignored {
2404 IgnoreStack::all()
2405 } else {
2406 ignore_stack.clone()
2407 },
2408 scan_queue: job.scan_queue.clone(),
2409 });
2410 } else {
2411 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2412 new_entries.push(child_entry);
2413 };
2414 }
2415
2416 self.snapshot
2417 .lock()
2418 .populate_dir(job.path.clone(), new_entries, new_ignore);
2419 for new_job in new_jobs {
2420 job.scan_queue.send(new_job).await.unwrap();
2421 }
2422
2423 Ok(())
2424 }
2425
2426 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2427 let mut snapshot = self.snapshot();
2428 snapshot.scan_id += 1;
2429
2430 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2431 abs_path
2432 } else {
2433 return false;
2434 };
2435 let root_char_bag = snapshot.root_char_bag;
2436 let next_entry_id = snapshot.next_entry_id.clone();
2437
2438 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2439 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2440
2441 for event in &events {
2442 match event.path.strip_prefix(&root_abs_path) {
2443 Ok(path) => snapshot.remove_path(&path),
2444 Err(_) => {
2445 log::error!(
2446 "unexpected event {:?} for root path {:?}",
2447 event.path,
2448 root_abs_path
2449 );
2450 continue;
2451 }
2452 }
2453 }
2454
2455 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2456 for event in events {
2457 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2458 Ok(path) => Arc::from(path.to_path_buf()),
2459 Err(_) => {
2460 log::error!(
2461 "unexpected event {:?} for root path {:?}",
2462 event.path,
2463 root_abs_path
2464 );
2465 continue;
2466 }
2467 };
2468
2469 match self.fs.metadata(&event.path).await {
2470 Ok(Some(metadata)) => {
2471 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2472 let mut fs_entry = Entry::new(
2473 path.clone(),
2474 &metadata,
2475 snapshot.next_entry_id.as_ref(),
2476 snapshot.root_char_bag,
2477 );
2478 fs_entry.is_ignored = ignore_stack.is_all();
2479 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2480 if metadata.is_dir {
2481 scan_queue_tx
2482 .send(ScanJob {
2483 abs_path: event.path,
2484 path,
2485 ignore_stack,
2486 scan_queue: scan_queue_tx.clone(),
2487 })
2488 .await
2489 .unwrap();
2490 }
2491 }
2492 Ok(None) => {}
2493 Err(err) => {
2494 // TODO - create a special 'error' entry in the entries tree to mark this
2495 log::error!("error reading file on event {:?}", err);
2496 }
2497 }
2498 }
2499
2500 *self.snapshot.lock() = snapshot;
2501
2502 // Scan any directories that were created as part of this event batch.
2503 drop(scan_queue_tx);
2504 self.executor
2505 .scoped(|scope| {
2506 for _ in 0..self.executor.num_cpus() {
2507 scope.spawn(async {
2508 while let Ok(job) = scan_queue_rx.recv().await {
2509 if let Err(err) = self
2510 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2511 .await
2512 {
2513 log::error!("error scanning {:?}: {}", job.abs_path, err);
2514 }
2515 }
2516 });
2517 }
2518 })
2519 .await;
2520
2521 // Attempt to detect renames only over a single batch of file-system events.
2522 self.snapshot.lock().removed_entry_ids.clear();
2523
2524 self.update_ignore_statuses().await;
2525 true
2526 }
2527
2528 async fn update_ignore_statuses(&self) {
2529 let mut snapshot = self.snapshot();
2530
2531 let mut ignores_to_update = Vec::new();
2532 let mut ignores_to_delete = Vec::new();
2533 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2534 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2535 ignores_to_update.push(parent_path.clone());
2536 }
2537
2538 let ignore_path = parent_path.join(&*GITIGNORE);
2539 if snapshot.entry_for_path(ignore_path).is_none() {
2540 ignores_to_delete.push(parent_path.clone());
2541 }
2542 }
2543
2544 for parent_path in ignores_to_delete {
2545 snapshot.ignores.remove(&parent_path);
2546 self.snapshot.lock().ignores.remove(&parent_path);
2547 }
2548
2549 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2550 ignores_to_update.sort_unstable();
2551 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2552 while let Some(parent_path) = ignores_to_update.next() {
2553 while ignores_to_update
2554 .peek()
2555 .map_or(false, |p| p.starts_with(&parent_path))
2556 {
2557 ignores_to_update.next().unwrap();
2558 }
2559
2560 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2561 ignore_queue_tx
2562 .send(UpdateIgnoreStatusJob {
2563 path: parent_path,
2564 ignore_stack,
2565 ignore_queue: ignore_queue_tx.clone(),
2566 })
2567 .await
2568 .unwrap();
2569 }
2570 drop(ignore_queue_tx);
2571
2572 self.executor
2573 .scoped(|scope| {
2574 for _ in 0..self.executor.num_cpus() {
2575 scope.spawn(async {
2576 while let Ok(job) = ignore_queue_rx.recv().await {
2577 self.update_ignore_status(job, &snapshot).await;
2578 }
2579 });
2580 }
2581 })
2582 .await;
2583 }
2584
2585 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2586 let mut ignore_stack = job.ignore_stack;
2587 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2588 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2589 }
2590
2591 let mut entries_by_id_edits = Vec::new();
2592 let mut entries_by_path_edits = Vec::new();
2593 for mut entry in snapshot.child_entries(&job.path).cloned() {
2594 let was_ignored = entry.is_ignored;
2595 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2596 if entry.is_dir() {
2597 let child_ignore_stack = if entry.is_ignored {
2598 IgnoreStack::all()
2599 } else {
2600 ignore_stack.clone()
2601 };
2602 job.ignore_queue
2603 .send(UpdateIgnoreStatusJob {
2604 path: entry.path.clone(),
2605 ignore_stack: child_ignore_stack,
2606 ignore_queue: job.ignore_queue.clone(),
2607 })
2608 .await
2609 .unwrap();
2610 }
2611
2612 if entry.is_ignored != was_ignored {
2613 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2614 path_entry.scan_id = snapshot.scan_id;
2615 path_entry.is_ignored = entry.is_ignored;
2616 entries_by_id_edits.push(Edit::Insert(path_entry));
2617 entries_by_path_edits.push(Edit::Insert(entry));
2618 }
2619 }
2620
2621 let mut snapshot = self.snapshot.lock();
2622 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2623 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2624 }
2625}
2626
2627async fn refresh_entry(
2628 fs: &dyn Fs,
2629 snapshot: &Mutex<Snapshot>,
2630 path: Arc<Path>,
2631 abs_path: &Path,
2632) -> Result<Entry> {
2633 let root_char_bag;
2634 let next_entry_id;
2635 {
2636 let snapshot = snapshot.lock();
2637 root_char_bag = snapshot.root_char_bag;
2638 next_entry_id = snapshot.next_entry_id.clone();
2639 }
2640 let entry = Entry::new(
2641 path,
2642 &fs.metadata(abs_path)
2643 .await?
2644 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2645 &next_entry_id,
2646 root_char_bag,
2647 );
2648 Ok(snapshot.lock().insert_entry(entry, fs))
2649}
2650
2651fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2652 let mut result = root_char_bag;
2653 result.extend(
2654 path.to_string_lossy()
2655 .chars()
2656 .map(|c| c.to_ascii_lowercase()),
2657 );
2658 result
2659}
2660
2661struct ScanJob {
2662 abs_path: PathBuf,
2663 path: Arc<Path>,
2664 ignore_stack: Arc<IgnoreStack>,
2665 scan_queue: Sender<ScanJob>,
2666}
2667
2668struct UpdateIgnoreStatusJob {
2669 path: Arc<Path>,
2670 ignore_stack: Arc<IgnoreStack>,
2671 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2672}
2673
2674pub trait WorktreeHandle {
2675 #[cfg(test)]
2676 fn flush_fs_events<'a>(
2677 &self,
2678 cx: &'a gpui::TestAppContext,
2679 ) -> futures::future::LocalBoxFuture<'a, ()>;
2680}
2681
2682impl WorktreeHandle for ModelHandle<Worktree> {
2683 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2684 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2685 // extra directory scans, and emit extra scan-state notifications.
2686 //
2687 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2688 // to ensure that all redundant FS events have already been processed.
2689 #[cfg(test)]
2690 fn flush_fs_events<'a>(
2691 &self,
2692 cx: &'a gpui::TestAppContext,
2693 ) -> futures::future::LocalBoxFuture<'a, ()> {
2694 use smol::future::FutureExt;
2695
2696 let filename = "fs-event-sentinel";
2697 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2698 let tree = self.clone();
2699 async move {
2700 std::fs::write(root_path.join(filename), "").unwrap();
2701 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2702 .await;
2703
2704 std::fs::remove_file(root_path.join(filename)).unwrap();
2705 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2706 .await;
2707
2708 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2709 .await;
2710 }
2711 .boxed_local()
2712 }
2713}
2714
2715#[derive(Clone, Debug)]
2716struct TraversalProgress<'a> {
2717 max_path: &'a Path,
2718 count: usize,
2719 visible_count: usize,
2720 file_count: usize,
2721 visible_file_count: usize,
2722}
2723
2724impl<'a> TraversalProgress<'a> {
2725 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2726 match (include_ignored, include_dirs) {
2727 (true, true) => self.count,
2728 (true, false) => self.file_count,
2729 (false, true) => self.visible_count,
2730 (false, false) => self.visible_file_count,
2731 }
2732 }
2733}
2734
2735impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2736 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2737 self.max_path = summary.max_path.as_ref();
2738 self.count += summary.count;
2739 self.visible_count += summary.visible_count;
2740 self.file_count += summary.file_count;
2741 self.visible_file_count += summary.visible_file_count;
2742 }
2743}
2744
2745impl<'a> Default for TraversalProgress<'a> {
2746 fn default() -> Self {
2747 Self {
2748 max_path: Path::new(""),
2749 count: 0,
2750 visible_count: 0,
2751 file_count: 0,
2752 visible_file_count: 0,
2753 }
2754 }
2755}
2756
2757pub struct Traversal<'a> {
2758 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2759 include_ignored: bool,
2760 include_dirs: bool,
2761}
2762
2763impl<'a> Traversal<'a> {
2764 pub fn advance(&mut self) -> bool {
2765 self.advance_to_offset(self.offset() + 1)
2766 }
2767
2768 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2769 self.cursor.seek_forward(
2770 &TraversalTarget::Count {
2771 count: offset,
2772 include_dirs: self.include_dirs,
2773 include_ignored: self.include_ignored,
2774 },
2775 Bias::Right,
2776 &(),
2777 )
2778 }
2779
2780 pub fn advance_to_sibling(&mut self) -> bool {
2781 while let Some(entry) = self.cursor.item() {
2782 self.cursor.seek_forward(
2783 &TraversalTarget::PathSuccessor(&entry.path),
2784 Bias::Left,
2785 &(),
2786 );
2787 if let Some(entry) = self.cursor.item() {
2788 if (self.include_dirs || !entry.is_dir())
2789 && (self.include_ignored || !entry.is_ignored)
2790 {
2791 return true;
2792 }
2793 }
2794 }
2795 false
2796 }
2797
2798 pub fn entry(&self) -> Option<&'a Entry> {
2799 self.cursor.item()
2800 }
2801
2802 pub fn offset(&self) -> usize {
2803 self.cursor
2804 .start()
2805 .count(self.include_dirs, self.include_ignored)
2806 }
2807}
2808
2809impl<'a> Iterator for Traversal<'a> {
2810 type Item = &'a Entry;
2811
2812 fn next(&mut self) -> Option<Self::Item> {
2813 if let Some(item) = self.entry() {
2814 self.advance();
2815 Some(item)
2816 } else {
2817 None
2818 }
2819 }
2820}
2821
2822#[derive(Debug)]
2823enum TraversalTarget<'a> {
2824 Path(&'a Path),
2825 PathSuccessor(&'a Path),
2826 Count {
2827 count: usize,
2828 include_ignored: bool,
2829 include_dirs: bool,
2830 },
2831}
2832
2833impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
2834 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
2835 match self {
2836 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
2837 TraversalTarget::PathSuccessor(path) => {
2838 if !cursor_location.max_path.starts_with(path) {
2839 Ordering::Equal
2840 } else {
2841 Ordering::Greater
2842 }
2843 }
2844 TraversalTarget::Count {
2845 count,
2846 include_dirs,
2847 include_ignored,
2848 } => Ord::cmp(
2849 count,
2850 &cursor_location.count(*include_dirs, *include_ignored),
2851 ),
2852 }
2853 }
2854}
2855
2856struct ChildEntriesIter<'a> {
2857 parent_path: &'a Path,
2858 traversal: Traversal<'a>,
2859}
2860
2861impl<'a> Iterator for ChildEntriesIter<'a> {
2862 type Item = &'a Entry;
2863
2864 fn next(&mut self) -> Option<Self::Item> {
2865 if let Some(item) = self.traversal.entry() {
2866 if item.path.starts_with(&self.parent_path) {
2867 self.traversal.advance_to_sibling();
2868 return Some(item);
2869 }
2870 }
2871 None
2872 }
2873}
2874
2875impl<'a> From<&'a Entry> for proto::Entry {
2876 fn from(entry: &'a Entry) -> Self {
2877 Self {
2878 id: entry.id as u64,
2879 is_dir: entry.is_dir(),
2880 path: entry.path.to_string_lossy().to_string(),
2881 inode: entry.inode,
2882 mtime: Some(entry.mtime.into()),
2883 is_symlink: entry.is_symlink,
2884 is_ignored: entry.is_ignored,
2885 }
2886 }
2887}
2888
2889impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2890 type Error = anyhow::Error;
2891
2892 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2893 if let Some(mtime) = entry.mtime {
2894 let kind = if entry.is_dir {
2895 EntryKind::Dir
2896 } else {
2897 let mut char_bag = root_char_bag.clone();
2898 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2899 EntryKind::File(char_bag)
2900 };
2901 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2902 Ok(Entry {
2903 id: entry.id as usize,
2904 kind,
2905 path: path.clone(),
2906 inode: entry.inode,
2907 mtime: mtime.into(),
2908 is_symlink: entry.is_symlink,
2909 is_ignored: entry.is_ignored,
2910 })
2911 } else {
2912 Err(anyhow!(
2913 "missing mtime in remote worktree entry {:?}",
2914 entry.path
2915 ))
2916 }
2917 }
2918}
2919
2920#[cfg(test)]
2921mod tests {
2922 use super::*;
2923 use crate::fs::FakeFs;
2924 use anyhow::Result;
2925 use buffer::Point;
2926 use client::test::FakeServer;
2927 use fs::RealFs;
2928 use language::Diagnostic;
2929 use lsp::Url;
2930 use rand::prelude::*;
2931 use serde_json::json;
2932 use std::{cell::RefCell, rc::Rc};
2933 use std::{
2934 env,
2935 fmt::Write,
2936 time::{SystemTime, UNIX_EPOCH},
2937 };
2938 use util::test::temp_tree;
2939
2940 #[gpui::test]
2941 async fn test_traversal(cx: gpui::TestAppContext) {
2942 let fs = FakeFs::new();
2943 fs.insert_tree(
2944 "/root",
2945 json!({
2946 ".gitignore": "a/b\n",
2947 "a": {
2948 "b": "",
2949 "c": "",
2950 }
2951 }),
2952 )
2953 .await;
2954
2955 let tree = Worktree::open_local(
2956 Client::new(),
2957 Arc::from(Path::new("/root")),
2958 Arc::new(fs),
2959 Default::default(),
2960 None,
2961 &mut cx.to_async(),
2962 )
2963 .await
2964 .unwrap();
2965 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2966 .await;
2967
2968 tree.read_with(&cx, |tree, _| {
2969 assert_eq!(
2970 tree.entries(false)
2971 .map(|entry| entry.path.as_ref())
2972 .collect::<Vec<_>>(),
2973 vec![
2974 Path::new(""),
2975 Path::new(".gitignore"),
2976 Path::new("a"),
2977 Path::new("a/c"),
2978 ]
2979 );
2980 })
2981 }
2982
2983 #[gpui::test]
2984 async fn test_save_file(mut cx: gpui::TestAppContext) {
2985 let dir = temp_tree(json!({
2986 "file1": "the old contents",
2987 }));
2988 let tree = Worktree::open_local(
2989 Client::new(),
2990 dir.path(),
2991 Arc::new(RealFs),
2992 Default::default(),
2993 None,
2994 &mut cx.to_async(),
2995 )
2996 .await
2997 .unwrap();
2998 let buffer = tree
2999 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3000 .await
3001 .unwrap();
3002 let save = buffer.update(&mut cx, |buffer, cx| {
3003 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3004 buffer.save(cx).unwrap()
3005 });
3006 save.await.unwrap();
3007
3008 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3009 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3010 }
3011
3012 #[gpui::test]
3013 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3014 let dir = temp_tree(json!({
3015 "file1": "the old contents",
3016 }));
3017 let file_path = dir.path().join("file1");
3018
3019 let tree = Worktree::open_local(
3020 Client::new(),
3021 file_path.clone(),
3022 Arc::new(RealFs),
3023 Default::default(),
3024 None,
3025 &mut cx.to_async(),
3026 )
3027 .await
3028 .unwrap();
3029 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3030 .await;
3031 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3032
3033 let buffer = tree
3034 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3035 .await
3036 .unwrap();
3037 let save = buffer.update(&mut cx, |buffer, cx| {
3038 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3039 buffer.save(cx).unwrap()
3040 });
3041 save.await.unwrap();
3042
3043 let new_text = std::fs::read_to_string(file_path).unwrap();
3044 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3045 }
3046
3047 #[gpui::test]
3048 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3049 let dir = temp_tree(json!({
3050 "a": {
3051 "file1": "",
3052 "file2": "",
3053 "file3": "",
3054 },
3055 "b": {
3056 "c": {
3057 "file4": "",
3058 "file5": "",
3059 }
3060 }
3061 }));
3062
3063 let user_id = 5;
3064 let mut client = Client::new();
3065 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3066 let tree = Worktree::open_local(
3067 client,
3068 dir.path(),
3069 Arc::new(RealFs),
3070 Default::default(),
3071 None,
3072 &mut cx.to_async(),
3073 )
3074 .await
3075 .unwrap();
3076
3077 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3078 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3079 async move { buffer.await.unwrap() }
3080 };
3081 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3082 tree.read_with(cx, |tree, _| {
3083 tree.entry_for_path(path)
3084 .expect(&format!("no entry for path {}", path))
3085 .id
3086 })
3087 };
3088
3089 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3090 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3091 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3092 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3093
3094 let file2_id = id_for_path("a/file2", &cx);
3095 let file3_id = id_for_path("a/file3", &cx);
3096 let file4_id = id_for_path("b/c/file4", &cx);
3097
3098 // Wait for the initial scan.
3099 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3100 .await;
3101
3102 // Create a remote copy of this worktree.
3103 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3104 let worktree_id = 1;
3105 let share_request = tree.update(&mut cx, |tree, cx| {
3106 tree.as_local().unwrap().share_request(cx)
3107 });
3108 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3109 server
3110 .respond(
3111 open_worktree.receipt(),
3112 proto::OpenWorktreeResponse { worktree_id: 1 },
3113 )
3114 .await;
3115
3116 let remote = Worktree::remote(
3117 proto::JoinWorktreeResponse {
3118 worktree: share_request.await.unwrap().worktree,
3119 replica_id: 1,
3120 peers: Vec::new(),
3121 },
3122 Client::new(),
3123 Default::default(),
3124 &mut cx.to_async(),
3125 )
3126 .await
3127 .unwrap();
3128
3129 cx.read(|cx| {
3130 assert!(!buffer2.read(cx).is_dirty());
3131 assert!(!buffer3.read(cx).is_dirty());
3132 assert!(!buffer4.read(cx).is_dirty());
3133 assert!(!buffer5.read(cx).is_dirty());
3134 });
3135
3136 // Rename and delete files and directories.
3137 tree.flush_fs_events(&cx).await;
3138 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3139 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3140 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3141 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3142 tree.flush_fs_events(&cx).await;
3143
3144 let expected_paths = vec![
3145 "a",
3146 "a/file1",
3147 "a/file2.new",
3148 "b",
3149 "d",
3150 "d/file3",
3151 "d/file4",
3152 ];
3153
3154 cx.read(|app| {
3155 assert_eq!(
3156 tree.read(app)
3157 .paths()
3158 .map(|p| p.to_str().unwrap())
3159 .collect::<Vec<_>>(),
3160 expected_paths
3161 );
3162
3163 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3164 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3165 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3166
3167 assert_eq!(
3168 buffer2.read(app).file().unwrap().path().as_ref(),
3169 Path::new("a/file2.new")
3170 );
3171 assert_eq!(
3172 buffer3.read(app).file().unwrap().path().as_ref(),
3173 Path::new("d/file3")
3174 );
3175 assert_eq!(
3176 buffer4.read(app).file().unwrap().path().as_ref(),
3177 Path::new("d/file4")
3178 );
3179 assert_eq!(
3180 buffer5.read(app).file().unwrap().path().as_ref(),
3181 Path::new("b/c/file5")
3182 );
3183
3184 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3185 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3186 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3187 assert!(buffer5.read(app).file().unwrap().is_deleted());
3188 });
3189
3190 // Update the remote worktree. Check that it becomes consistent with the
3191 // local worktree.
3192 remote.update(&mut cx, |remote, cx| {
3193 let update_message =
3194 tree.read(cx)
3195 .snapshot()
3196 .build_update(&initial_snapshot, worktree_id, true);
3197 remote
3198 .as_remote_mut()
3199 .unwrap()
3200 .snapshot
3201 .apply_update(update_message)
3202 .unwrap();
3203
3204 assert_eq!(
3205 remote
3206 .paths()
3207 .map(|p| p.to_str().unwrap())
3208 .collect::<Vec<_>>(),
3209 expected_paths
3210 );
3211 });
3212 }
3213
3214 #[gpui::test]
3215 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
3216 let dir = temp_tree(json!({
3217 ".git": {},
3218 ".gitignore": "ignored-dir\n",
3219 "tracked-dir": {
3220 "tracked-file1": "tracked contents",
3221 },
3222 "ignored-dir": {
3223 "ignored-file1": "ignored contents",
3224 }
3225 }));
3226
3227 let tree = Worktree::open_local(
3228 Client::new(),
3229 dir.path(),
3230 Arc::new(RealFs),
3231 Default::default(),
3232 None,
3233 &mut cx.to_async(),
3234 )
3235 .await
3236 .unwrap();
3237 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3238 .await;
3239 tree.flush_fs_events(&cx).await;
3240 cx.read(|cx| {
3241 let tree = tree.read(cx);
3242 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3243 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3244 assert_eq!(tracked.is_ignored, false);
3245 assert_eq!(ignored.is_ignored, true);
3246 });
3247
3248 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3249 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3250 tree.flush_fs_events(&cx).await;
3251 cx.read(|cx| {
3252 let tree = tree.read(cx);
3253 let dot_git = tree.entry_for_path(".git").unwrap();
3254 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3255 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3256 assert_eq!(tracked.is_ignored, false);
3257 assert_eq!(ignored.is_ignored, true);
3258 assert_eq!(dot_git.is_ignored, true);
3259 });
3260 }
3261
3262 #[gpui::test]
3263 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3264 let user_id = 100;
3265 let mut client = Client::new();
3266 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3267
3268 let fs = Arc::new(FakeFs::new());
3269 fs.insert_tree(
3270 "/path",
3271 json!({
3272 "to": {
3273 "the-dir": {
3274 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3275 "a.txt": "a-contents",
3276 },
3277 },
3278 }),
3279 )
3280 .await;
3281
3282 let worktree = Worktree::open_local(
3283 client.clone(),
3284 "/path/to/the-dir".as_ref(),
3285 fs,
3286 Default::default(),
3287 None,
3288 &mut cx.to_async(),
3289 )
3290 .await
3291 .unwrap();
3292
3293 {
3294 let cx = cx.to_async();
3295 client.authenticate_and_connect(&cx).await.unwrap();
3296 }
3297
3298 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3299 assert_eq!(
3300 open_worktree.payload,
3301 proto::OpenWorktree {
3302 root_name: "the-dir".to_string(),
3303 collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3304 }
3305 );
3306
3307 server
3308 .respond(
3309 open_worktree.receipt(),
3310 proto::OpenWorktreeResponse { worktree_id: 5 },
3311 )
3312 .await;
3313 let remote_id = worktree
3314 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3315 .await;
3316 assert_eq!(remote_id, Some(5));
3317
3318 cx.update(move |_| drop(worktree));
3319 server.receive::<proto::CloseWorktree>().await.unwrap();
3320 }
3321
3322 #[gpui::test]
3323 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3324 use std::fs;
3325
3326 let dir = temp_tree(json!({
3327 "file1": "abc",
3328 "file2": "def",
3329 "file3": "ghi",
3330 }));
3331 let tree = Worktree::open_local(
3332 Client::new(),
3333 dir.path(),
3334 Arc::new(RealFs),
3335 Default::default(),
3336 None,
3337 &mut cx.to_async(),
3338 )
3339 .await
3340 .unwrap();
3341 tree.flush_fs_events(&cx).await;
3342 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3343 .await;
3344
3345 let buffer1 = tree
3346 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3347 .await
3348 .unwrap();
3349 let events = Rc::new(RefCell::new(Vec::new()));
3350
3351 // initially, the buffer isn't dirty.
3352 buffer1.update(&mut cx, |buffer, cx| {
3353 cx.subscribe(&buffer1, {
3354 let events = events.clone();
3355 move |_, _, event, _| events.borrow_mut().push(event.clone())
3356 })
3357 .detach();
3358
3359 assert!(!buffer.is_dirty());
3360 assert!(events.borrow().is_empty());
3361
3362 buffer.edit(vec![1..2], "", cx);
3363 });
3364
3365 // after the first edit, the buffer is dirty, and emits a dirtied event.
3366 buffer1.update(&mut cx, |buffer, cx| {
3367 assert!(buffer.text() == "ac");
3368 assert!(buffer.is_dirty());
3369 assert_eq!(
3370 *events.borrow(),
3371 &[language::Event::Edited, language::Event::Dirtied]
3372 );
3373 events.borrow_mut().clear();
3374 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3375 });
3376
3377 // after saving, the buffer is not dirty, and emits a saved event.
3378 buffer1.update(&mut cx, |buffer, cx| {
3379 assert!(!buffer.is_dirty());
3380 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3381 events.borrow_mut().clear();
3382
3383 buffer.edit(vec![1..1], "B", cx);
3384 buffer.edit(vec![2..2], "D", cx);
3385 });
3386
3387 // after editing again, the buffer is dirty, and emits another dirty event.
3388 buffer1.update(&mut cx, |buffer, cx| {
3389 assert!(buffer.text() == "aBDc");
3390 assert!(buffer.is_dirty());
3391 assert_eq!(
3392 *events.borrow(),
3393 &[
3394 language::Event::Edited,
3395 language::Event::Dirtied,
3396 language::Event::Edited
3397 ],
3398 );
3399 events.borrow_mut().clear();
3400
3401 // TODO - currently, after restoring the buffer to its
3402 // previously-saved state, the is still considered dirty.
3403 buffer.edit(vec![1..3], "", cx);
3404 assert!(buffer.text() == "ac");
3405 assert!(buffer.is_dirty());
3406 });
3407
3408 assert_eq!(*events.borrow(), &[language::Event::Edited]);
3409
3410 // When a file is deleted, the buffer is considered dirty.
3411 let events = Rc::new(RefCell::new(Vec::new()));
3412 let buffer2 = tree
3413 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3414 .await
3415 .unwrap();
3416 buffer2.update(&mut cx, |_, cx| {
3417 cx.subscribe(&buffer2, {
3418 let events = events.clone();
3419 move |_, _, event, _| events.borrow_mut().push(event.clone())
3420 })
3421 .detach();
3422 });
3423
3424 fs::remove_file(dir.path().join("file2")).unwrap();
3425 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3426 assert_eq!(
3427 *events.borrow(),
3428 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3429 );
3430
3431 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3432 let events = Rc::new(RefCell::new(Vec::new()));
3433 let buffer3 = tree
3434 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3435 .await
3436 .unwrap();
3437 buffer3.update(&mut cx, |_, cx| {
3438 cx.subscribe(&buffer3, {
3439 let events = events.clone();
3440 move |_, _, event, _| events.borrow_mut().push(event.clone())
3441 })
3442 .detach();
3443 });
3444
3445 tree.flush_fs_events(&cx).await;
3446 buffer3.update(&mut cx, |buffer, cx| {
3447 buffer.edit(Some(0..0), "x", cx);
3448 });
3449 events.borrow_mut().clear();
3450 fs::remove_file(dir.path().join("file3")).unwrap();
3451 buffer3
3452 .condition(&cx, |_, _| !events.borrow().is_empty())
3453 .await;
3454 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3455 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3456 }
3457
3458 #[gpui::test]
3459 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3460 use buffer::{Point, Selection, SelectionGoal};
3461 use std::fs;
3462
3463 let initial_contents = "aaa\nbbbbb\nc\n";
3464 let dir = temp_tree(json!({ "the-file": initial_contents }));
3465 let tree = Worktree::open_local(
3466 Client::new(),
3467 dir.path(),
3468 Arc::new(RealFs),
3469 Default::default(),
3470 None,
3471 &mut cx.to_async(),
3472 )
3473 .await
3474 .unwrap();
3475 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3476 .await;
3477
3478 let abs_path = dir.path().join("the-file");
3479 let buffer = tree
3480 .update(&mut cx, |tree, cx| {
3481 tree.open_buffer(Path::new("the-file"), cx)
3482 })
3483 .await
3484 .unwrap();
3485
3486 // Add a cursor on each row.
3487 let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3488 assert!(!buffer.is_dirty());
3489 buffer.add_selection_set(
3490 &(0..3)
3491 .map(|row| Selection {
3492 id: row as usize,
3493 start: Point::new(row, 1),
3494 end: Point::new(row, 1),
3495 reversed: false,
3496 goal: SelectionGoal::None,
3497 })
3498 .collect::<Vec<_>>(),
3499 cx,
3500 )
3501 });
3502
3503 // Change the file on disk, adding two new lines of text, and removing
3504 // one line.
3505 buffer.read_with(&cx, |buffer, _| {
3506 assert!(!buffer.is_dirty());
3507 assert!(!buffer.has_conflict());
3508 });
3509 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3510 fs::write(&abs_path, new_contents).unwrap();
3511
3512 // Because the buffer was not modified, it is reloaded from disk. Its
3513 // contents are edited according to the diff between the old and new
3514 // file contents.
3515 buffer
3516 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3517 .await;
3518
3519 buffer.update(&mut cx, |buffer, _| {
3520 assert_eq!(buffer.text(), new_contents);
3521 assert!(!buffer.is_dirty());
3522 assert!(!buffer.has_conflict());
3523
3524 let cursor_positions = buffer
3525 .selection_set(selection_set_id)
3526 .unwrap()
3527 .point_selections(&*buffer)
3528 .map(|selection| {
3529 assert_eq!(selection.start, selection.end);
3530 selection.start
3531 })
3532 .collect::<Vec<_>>();
3533 assert_eq!(
3534 cursor_positions,
3535 [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3536 );
3537 });
3538
3539 // Modify the buffer
3540 buffer.update(&mut cx, |buffer, cx| {
3541 buffer.edit(vec![0..0], " ", cx);
3542 assert!(buffer.is_dirty());
3543 assert!(!buffer.has_conflict());
3544 });
3545
3546 // Change the file on disk again, adding blank lines to the beginning.
3547 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3548
3549 // Because the buffer is modified, it doesn't reload from disk, but is
3550 // marked as having a conflict.
3551 buffer
3552 .condition(&cx, |buffer, _| buffer.has_conflict())
3553 .await;
3554 }
3555
3556 #[gpui::test]
3557 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3558 let (language_server, mut fake_lsp) = LanguageServer::fake(cx.background()).await;
3559 let dir = temp_tree(json!({
3560 "a.rs": "fn a() { A }",
3561 "b.rs": "const y: i32 = 1",
3562 }));
3563
3564 let tree = Worktree::open_local(
3565 Client::new(),
3566 dir.path(),
3567 Arc::new(RealFs),
3568 Default::default(),
3569 Some(language_server),
3570 &mut cx.to_async(),
3571 )
3572 .await
3573 .unwrap();
3574 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3575 .await;
3576
3577 fake_lsp
3578 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3579 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3580 version: None,
3581 diagnostics: vec![lsp::Diagnostic {
3582 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3583 severity: Some(lsp::DiagnosticSeverity::ERROR),
3584 message: "undefined variable 'A'".to_string(),
3585 ..Default::default()
3586 }],
3587 })
3588 .await;
3589
3590 let buffer = tree
3591 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3592 .await
3593 .unwrap();
3594
3595 buffer.read_with(&cx, |buffer, _| {
3596 let diagnostics = buffer
3597 .diagnostics_in_range(0..buffer.len())
3598 .collect::<Vec<_>>();
3599 assert_eq!(
3600 diagnostics,
3601 &[(
3602 Point::new(0, 9)..Point::new(0, 10),
3603 &Diagnostic {
3604 severity: lsp::DiagnosticSeverity::ERROR,
3605 message: "undefined variable 'A'".to_string()
3606 }
3607 )]
3608 )
3609 });
3610 }
3611
3612 #[gpui::test(iterations = 100)]
3613 fn test_random(mut rng: StdRng) {
3614 let operations = env::var("OPERATIONS")
3615 .map(|o| o.parse().unwrap())
3616 .unwrap_or(40);
3617 let initial_entries = env::var("INITIAL_ENTRIES")
3618 .map(|o| o.parse().unwrap())
3619 .unwrap_or(20);
3620
3621 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3622 for _ in 0..initial_entries {
3623 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3624 }
3625 log::info!("Generated initial tree");
3626
3627 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3628 let fs = Arc::new(RealFs);
3629 let next_entry_id = Arc::new(AtomicUsize::new(0));
3630 let mut initial_snapshot = Snapshot {
3631 id: 0,
3632 scan_id: 0,
3633 abs_path: root_dir.path().into(),
3634 entries_by_path: Default::default(),
3635 entries_by_id: Default::default(),
3636 removed_entry_ids: Default::default(),
3637 ignores: Default::default(),
3638 root_name: Default::default(),
3639 root_char_bag: Default::default(),
3640 next_entry_id: next_entry_id.clone(),
3641 };
3642 initial_snapshot.insert_entry(
3643 Entry::new(
3644 Path::new("").into(),
3645 &smol::block_on(fs.metadata(root_dir.path()))
3646 .unwrap()
3647 .unwrap(),
3648 &next_entry_id,
3649 Default::default(),
3650 ),
3651 fs.as_ref(),
3652 );
3653 let mut scanner = BackgroundScanner::new(
3654 Arc::new(Mutex::new(initial_snapshot.clone())),
3655 notify_tx,
3656 fs.clone(),
3657 Arc::new(gpui::executor::Background::new()),
3658 );
3659 smol::block_on(scanner.scan_dirs()).unwrap();
3660 scanner.snapshot().check_invariants();
3661
3662 let mut events = Vec::new();
3663 let mut snapshots = Vec::new();
3664 let mut mutations_len = operations;
3665 while mutations_len > 1 {
3666 if !events.is_empty() && rng.gen_bool(0.4) {
3667 let len = rng.gen_range(0..=events.len());
3668 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3669 log::info!("Delivering events: {:#?}", to_deliver);
3670 smol::block_on(scanner.process_events(to_deliver));
3671 scanner.snapshot().check_invariants();
3672 } else {
3673 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3674 mutations_len -= 1;
3675 }
3676
3677 if rng.gen_bool(0.2) {
3678 snapshots.push(scanner.snapshot());
3679 }
3680 }
3681 log::info!("Quiescing: {:#?}", events);
3682 smol::block_on(scanner.process_events(events));
3683 scanner.snapshot().check_invariants();
3684
3685 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3686 let mut new_scanner = BackgroundScanner::new(
3687 Arc::new(Mutex::new(initial_snapshot)),
3688 notify_tx,
3689 scanner.fs.clone(),
3690 scanner.executor.clone(),
3691 );
3692 smol::block_on(new_scanner.scan_dirs()).unwrap();
3693 assert_eq!(
3694 scanner.snapshot().to_vec(true),
3695 new_scanner.snapshot().to_vec(true)
3696 );
3697
3698 for mut prev_snapshot in snapshots {
3699 let include_ignored = rng.gen::<bool>();
3700 if !include_ignored {
3701 let mut entries_by_path_edits = Vec::new();
3702 let mut entries_by_id_edits = Vec::new();
3703 for entry in prev_snapshot
3704 .entries_by_id
3705 .cursor::<()>()
3706 .filter(|e| e.is_ignored)
3707 {
3708 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
3709 entries_by_id_edits.push(Edit::Remove(entry.id));
3710 }
3711
3712 prev_snapshot
3713 .entries_by_path
3714 .edit(entries_by_path_edits, &());
3715 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
3716 }
3717
3718 let update = scanner
3719 .snapshot()
3720 .build_update(&prev_snapshot, 0, include_ignored);
3721 prev_snapshot.apply_update(update).unwrap();
3722 assert_eq!(
3723 prev_snapshot.to_vec(true),
3724 scanner.snapshot().to_vec(include_ignored)
3725 );
3726 }
3727 }
3728
3729 fn randomly_mutate_tree(
3730 root_path: &Path,
3731 insertion_probability: f64,
3732 rng: &mut impl Rng,
3733 ) -> Result<Vec<fsevent::Event>> {
3734 let root_path = root_path.canonicalize().unwrap();
3735 let (dirs, files) = read_dir_recursive(root_path.clone());
3736
3737 let mut events = Vec::new();
3738 let mut record_event = |path: PathBuf| {
3739 events.push(fsevent::Event {
3740 event_id: SystemTime::now()
3741 .duration_since(UNIX_EPOCH)
3742 .unwrap()
3743 .as_secs(),
3744 flags: fsevent::StreamFlags::empty(),
3745 path,
3746 });
3747 };
3748
3749 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3750 let path = dirs.choose(rng).unwrap();
3751 let new_path = path.join(gen_name(rng));
3752
3753 if rng.gen() {
3754 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3755 std::fs::create_dir(&new_path)?;
3756 } else {
3757 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3758 std::fs::write(&new_path, "")?;
3759 }
3760 record_event(new_path);
3761 } else if rng.gen_bool(0.05) {
3762 let ignore_dir_path = dirs.choose(rng).unwrap();
3763 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3764
3765 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3766 let files_to_ignore = {
3767 let len = rng.gen_range(0..=subfiles.len());
3768 subfiles.choose_multiple(rng, len)
3769 };
3770 let dirs_to_ignore = {
3771 let len = rng.gen_range(0..subdirs.len());
3772 subdirs.choose_multiple(rng, len)
3773 };
3774
3775 let mut ignore_contents = String::new();
3776 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3777 write!(
3778 ignore_contents,
3779 "{}\n",
3780 path_to_ignore
3781 .strip_prefix(&ignore_dir_path)?
3782 .to_str()
3783 .unwrap()
3784 )
3785 .unwrap();
3786 }
3787 log::info!(
3788 "Creating {:?} with contents:\n{}",
3789 ignore_path.strip_prefix(&root_path)?,
3790 ignore_contents
3791 );
3792 std::fs::write(&ignore_path, ignore_contents).unwrap();
3793 record_event(ignore_path);
3794 } else {
3795 let old_path = {
3796 let file_path = files.choose(rng);
3797 let dir_path = dirs[1..].choose(rng);
3798 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3799 };
3800
3801 let is_rename = rng.gen();
3802 if is_rename {
3803 let new_path_parent = dirs
3804 .iter()
3805 .filter(|d| !d.starts_with(old_path))
3806 .choose(rng)
3807 .unwrap();
3808
3809 let overwrite_existing_dir =
3810 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3811 let new_path = if overwrite_existing_dir {
3812 std::fs::remove_dir_all(&new_path_parent).ok();
3813 new_path_parent.to_path_buf()
3814 } else {
3815 new_path_parent.join(gen_name(rng))
3816 };
3817
3818 log::info!(
3819 "Renaming {:?} to {}{:?}",
3820 old_path.strip_prefix(&root_path)?,
3821 if overwrite_existing_dir {
3822 "overwrite "
3823 } else {
3824 ""
3825 },
3826 new_path.strip_prefix(&root_path)?
3827 );
3828 std::fs::rename(&old_path, &new_path)?;
3829 record_event(old_path.clone());
3830 record_event(new_path);
3831 } else if old_path.is_dir() {
3832 let (dirs, files) = read_dir_recursive(old_path.clone());
3833
3834 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3835 std::fs::remove_dir_all(&old_path).unwrap();
3836 for file in files {
3837 record_event(file);
3838 }
3839 for dir in dirs {
3840 record_event(dir);
3841 }
3842 } else {
3843 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3844 std::fs::remove_file(old_path).unwrap();
3845 record_event(old_path.clone());
3846 }
3847 }
3848
3849 Ok(events)
3850 }
3851
3852 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3853 let child_entries = std::fs::read_dir(&path).unwrap();
3854 let mut dirs = vec![path];
3855 let mut files = Vec::new();
3856 for child_entry in child_entries {
3857 let child_path = child_entry.unwrap().path();
3858 if child_path.is_dir() {
3859 let (child_dirs, child_files) = read_dir_recursive(child_path);
3860 dirs.extend(child_dirs);
3861 files.extend(child_files);
3862 } else {
3863 files.push(child_path);
3864 }
3865 }
3866 (dirs, files)
3867 }
3868
3869 fn gen_name(rng: &mut impl Rng) -> String {
3870 (0..6)
3871 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3872 .map(char::from)
3873 .collect()
3874 }
3875
3876 impl Snapshot {
3877 fn check_invariants(&self) {
3878 let mut files = self.files(true, 0);
3879 let mut visible_files = self.files(false, 0);
3880 for entry in self.entries_by_path.cursor::<()>() {
3881 if entry.is_file() {
3882 assert_eq!(files.next().unwrap().inode, entry.inode);
3883 if !entry.is_ignored {
3884 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3885 }
3886 }
3887 }
3888 assert!(files.next().is_none());
3889 assert!(visible_files.next().is_none());
3890
3891 let mut bfs_paths = Vec::new();
3892 let mut stack = vec![Path::new("")];
3893 while let Some(path) = stack.pop() {
3894 bfs_paths.push(path);
3895 let ix = stack.len();
3896 for child_entry in self.child_entries(path) {
3897 stack.insert(ix, &child_entry.path);
3898 }
3899 }
3900
3901 let dfs_paths = self
3902 .entries_by_path
3903 .cursor::<()>()
3904 .map(|e| e.path.as_ref())
3905 .collect::<Vec<_>>();
3906 assert_eq!(bfs_paths, dfs_paths);
3907
3908 for (ignore_parent_path, _) in &self.ignores {
3909 assert!(self.entry_for_path(ignore_parent_path).is_some());
3910 assert!(self
3911 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3912 .is_some());
3913 }
3914 }
3915
3916 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
3917 let mut paths = Vec::new();
3918 for entry in self.entries_by_path.cursor::<()>() {
3919 if include_ignored || !entry.is_ignored {
3920 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3921 }
3922 }
3923 paths.sort_by(|a, b| a.0.cmp(&b.0));
3924 paths
3925 }
3926 }
3927}