1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4};
5use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
6use anyhow::{anyhow, Context, Result};
7use client::{proto, Client, PeerId, TypedEnvelope};
8use clock::ReplicaId;
9use futures::{Stream, StreamExt};
10use fuzzy::CharBag;
11use gpui::{
12 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
13 Task, UpgradeModelHandle, WeakModelHandle,
14};
15use language::{Buffer, Language, LanguageRegistry, Operation, Rope};
16use lazy_static::lazy_static;
17use lsp::LanguageServer;
18use parking_lot::Mutex;
19use postage::{
20 prelude::{Sink as _, Stream as _},
21 watch,
22};
23
24use serde::Deserialize;
25use smol::channel::{self, Sender};
26use std::{
27 any::Any,
28 cmp::{self, Ordering},
29 collections::HashMap,
30 convert::{TryFrom, TryInto},
31 ffi::{OsStr, OsString},
32 fmt,
33 future::Future,
34 ops::Deref,
35 path::{Path, PathBuf},
36 sync::{
37 atomic::{AtomicUsize, Ordering::SeqCst},
38 Arc,
39 },
40 time::{Duration, SystemTime},
41};
42use sum_tree::Bias;
43use sum_tree::{Edit, SeekTarget, SumTree};
44use util::{ResultExt, TryFutureExt};
45
46lazy_static! {
47 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
48}
49
50#[derive(Clone, Debug)]
51enum ScanState {
52 Idle,
53 Scanning,
54 Err(Arc<anyhow::Error>),
55}
56
57pub enum Worktree {
58 Local(LocalWorktree),
59 Remote(RemoteWorktree),
60}
61
62pub enum Event {
63 Closed,
64}
65
66impl Entity for Worktree {
67 type Event = Event;
68
69 fn release(&mut self, cx: &mut MutableAppContext) {
70 match self {
71 Self::Local(tree) => {
72 if let Some(worktree_id) = *tree.remote_id.borrow() {
73 let rpc = tree.rpc.clone();
74 cx.spawn(|_| async move {
75 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
76 log::error!("error closing worktree: {}", err);
77 }
78 })
79 .detach();
80 }
81 }
82 Self::Remote(tree) => {
83 let rpc = tree.client.clone();
84 let worktree_id = tree.remote_id;
85 cx.spawn(|_| async move {
86 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
87 log::error!("error closing worktree: {}", err);
88 }
89 })
90 .detach();
91 }
92 }
93 }
94
95 fn app_will_quit(
96 &mut self,
97 _: &mut MutableAppContext,
98 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
99 use futures::FutureExt;
100
101 if let Self::Local(worktree) = self {
102 let shutdown_futures = worktree
103 .language_servers
104 .drain()
105 .filter_map(|(_, server)| server.shutdown())
106 .collect::<Vec<_>>();
107 Some(
108 async move {
109 futures::future::join_all(shutdown_futures).await;
110 }
111 .boxed(),
112 )
113 } else {
114 None
115 }
116 }
117}
118
119impl Worktree {
120 pub async fn open_local(
121 rpc: Arc<Client>,
122 path: impl Into<Arc<Path>>,
123 fs: Arc<dyn Fs>,
124 languages: Arc<LanguageRegistry>,
125 cx: &mut AsyncAppContext,
126 ) -> Result<ModelHandle<Self>> {
127 let (tree, scan_states_tx) =
128 LocalWorktree::new(rpc, path, fs.clone(), languages, cx).await?;
129 tree.update(cx, |tree, cx| {
130 let tree = tree.as_local_mut().unwrap();
131 let abs_path = tree.snapshot.abs_path.clone();
132 let background_snapshot = tree.background_snapshot.clone();
133 let background = cx.background().clone();
134 tree._background_scanner_task = Some(cx.background().spawn(async move {
135 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
136 let scanner =
137 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
138 scanner.run(events).await;
139 }));
140 });
141 Ok(tree)
142 }
143
144 pub async fn open_remote(
145 rpc: Arc<Client>,
146 id: u64,
147 languages: Arc<LanguageRegistry>,
148 cx: &mut AsyncAppContext,
149 ) -> Result<ModelHandle<Self>> {
150 let response = rpc.request(proto::JoinWorktree { worktree_id: id }).await?;
151 Worktree::remote(response, rpc, languages, cx).await
152 }
153
154 async fn remote(
155 join_response: proto::JoinWorktreeResponse,
156 rpc: Arc<Client>,
157 languages: Arc<LanguageRegistry>,
158 cx: &mut AsyncAppContext,
159 ) -> Result<ModelHandle<Self>> {
160 let worktree = join_response
161 .worktree
162 .ok_or_else(|| anyhow!("empty worktree"))?;
163
164 let remote_id = worktree.id;
165 let replica_id = join_response.replica_id as ReplicaId;
166 let peers = join_response.peers;
167 let root_char_bag: CharBag = worktree
168 .root_name
169 .chars()
170 .map(|c| c.to_ascii_lowercase())
171 .collect();
172 let root_name = worktree.root_name.clone();
173 let (entries_by_path, entries_by_id) = cx
174 .background()
175 .spawn(async move {
176 let mut entries_by_path_edits = Vec::new();
177 let mut entries_by_id_edits = Vec::new();
178 for entry in worktree.entries {
179 match Entry::try_from((&root_char_bag, entry)) {
180 Ok(entry) => {
181 entries_by_id_edits.push(Edit::Insert(PathEntry {
182 id: entry.id,
183 path: entry.path.clone(),
184 is_ignored: entry.is_ignored,
185 scan_id: 0,
186 }));
187 entries_by_path_edits.push(Edit::Insert(entry));
188 }
189 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
190 }
191 }
192
193 let mut entries_by_path = SumTree::new();
194 let mut entries_by_id = SumTree::new();
195 entries_by_path.edit(entries_by_path_edits, &());
196 entries_by_id.edit(entries_by_id_edits, &());
197 (entries_by_path, entries_by_id)
198 })
199 .await;
200
201 let worktree = cx.update(|cx| {
202 cx.add_model(|cx: &mut ModelContext<Worktree>| {
203 let snapshot = Snapshot {
204 id: cx.model_id(),
205 scan_id: 0,
206 abs_path: Path::new("").into(),
207 root_name,
208 root_char_bag,
209 ignores: Default::default(),
210 entries_by_path,
211 entries_by_id,
212 removed_entry_ids: Default::default(),
213 next_entry_id: Default::default(),
214 };
215
216 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
217 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
218
219 cx.background()
220 .spawn(async move {
221 while let Some(update) = updates_rx.recv().await {
222 let mut snapshot = snapshot_tx.borrow().clone();
223 if let Err(error) = snapshot.apply_update(update) {
224 log::error!("error applying worktree update: {}", error);
225 }
226 *snapshot_tx.borrow_mut() = snapshot;
227 }
228 })
229 .detach();
230
231 {
232 let mut snapshot_rx = snapshot_rx.clone();
233 cx.spawn_weak(|this, mut cx| async move {
234 while let Some(_) = snapshot_rx.recv().await {
235 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
236 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
237 } else {
238 break;
239 }
240 }
241 })
242 .detach();
243 }
244
245 let _subscriptions = vec![
246 rpc.subscribe_to_entity(remote_id, cx, Self::handle_add_peer),
247 rpc.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer),
248 rpc.subscribe_to_entity(remote_id, cx, Self::handle_update),
249 rpc.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
250 rpc.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
251 rpc.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
252 ];
253
254 Worktree::Remote(RemoteWorktree {
255 remote_id,
256 replica_id,
257 snapshot,
258 snapshot_rx,
259 updates_tx,
260 client: rpc.clone(),
261 open_buffers: Default::default(),
262 peers: peers
263 .into_iter()
264 .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId))
265 .collect(),
266 queued_operations: Default::default(),
267 languages,
268 _subscriptions,
269 })
270 })
271 });
272
273 Ok(worktree)
274 }
275
276 pub fn as_local(&self) -> Option<&LocalWorktree> {
277 if let Worktree::Local(worktree) = self {
278 Some(worktree)
279 } else {
280 None
281 }
282 }
283
284 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
285 if let Worktree::Local(worktree) = self {
286 Some(worktree)
287 } else {
288 None
289 }
290 }
291
292 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
293 if let Worktree::Remote(worktree) = self {
294 Some(worktree)
295 } else {
296 None
297 }
298 }
299
300 pub fn snapshot(&self) -> Snapshot {
301 match self {
302 Worktree::Local(worktree) => worktree.snapshot(),
303 Worktree::Remote(worktree) => worktree.snapshot(),
304 }
305 }
306
307 pub fn replica_id(&self) -> ReplicaId {
308 match self {
309 Worktree::Local(_) => 0,
310 Worktree::Remote(worktree) => worktree.replica_id,
311 }
312 }
313
314 pub fn languages(&self) -> &Arc<LanguageRegistry> {
315 match self {
316 Worktree::Local(worktree) => &worktree.languages,
317 Worktree::Remote(worktree) => &worktree.languages,
318 }
319 }
320
321 pub fn handle_add_peer(
322 &mut self,
323 envelope: TypedEnvelope<proto::AddPeer>,
324 _: Arc<Client>,
325 cx: &mut ModelContext<Self>,
326 ) -> Result<()> {
327 match self {
328 Worktree::Local(worktree) => worktree.add_peer(envelope, cx),
329 Worktree::Remote(worktree) => worktree.add_peer(envelope, cx),
330 }
331 }
332
333 pub fn handle_remove_peer(
334 &mut self,
335 envelope: TypedEnvelope<proto::RemovePeer>,
336 _: Arc<Client>,
337 cx: &mut ModelContext<Self>,
338 ) -> Result<()> {
339 match self {
340 Worktree::Local(worktree) => worktree.remove_peer(envelope, cx),
341 Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx),
342 }
343 }
344
345 pub fn handle_update(
346 &mut self,
347 envelope: TypedEnvelope<proto::UpdateWorktree>,
348 _: Arc<Client>,
349 cx: &mut ModelContext<Self>,
350 ) -> anyhow::Result<()> {
351 self.as_remote_mut()
352 .unwrap()
353 .update_from_remote(envelope, cx)
354 }
355
356 pub fn handle_open_buffer(
357 &mut self,
358 envelope: TypedEnvelope<proto::OpenBuffer>,
359 rpc: Arc<Client>,
360 cx: &mut ModelContext<Self>,
361 ) -> anyhow::Result<()> {
362 let receipt = envelope.receipt();
363
364 let response = self
365 .as_local_mut()
366 .unwrap()
367 .open_remote_buffer(envelope, cx);
368
369 cx.background()
370 .spawn(
371 async move {
372 rpc.respond(receipt, response.await?).await?;
373 Ok(())
374 }
375 .log_err(),
376 )
377 .detach();
378
379 Ok(())
380 }
381
382 pub fn handle_close_buffer(
383 &mut self,
384 envelope: TypedEnvelope<proto::CloseBuffer>,
385 _: Arc<Client>,
386 cx: &mut ModelContext<Self>,
387 ) -> anyhow::Result<()> {
388 self.as_local_mut()
389 .unwrap()
390 .close_remote_buffer(envelope, cx)
391 }
392
393 pub fn peers(&self) -> &HashMap<PeerId, ReplicaId> {
394 match self {
395 Worktree::Local(worktree) => &worktree.peers,
396 Worktree::Remote(worktree) => &worktree.peers,
397 }
398 }
399
400 pub fn open_buffer(
401 &mut self,
402 path: impl AsRef<Path>,
403 cx: &mut ModelContext<Self>,
404 ) -> Task<Result<ModelHandle<Buffer>>> {
405 match self {
406 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
407 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
408 }
409 }
410
411 #[cfg(feature = "test-support")]
412 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
413 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
414 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
415 Worktree::Remote(worktree) => {
416 Box::new(worktree.open_buffers.values().filter_map(|buf| {
417 if let RemoteBuffer::Loaded(buf) = buf {
418 Some(buf)
419 } else {
420 None
421 }
422 }))
423 }
424 };
425
426 let path = path.as_ref();
427 open_buffers
428 .find(|buffer| {
429 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
430 file.path().as_ref() == path
431 } else {
432 false
433 }
434 })
435 .is_some()
436 }
437
438 pub fn handle_update_buffer(
439 &mut self,
440 envelope: TypedEnvelope<proto::UpdateBuffer>,
441 _: Arc<Client>,
442 cx: &mut ModelContext<Self>,
443 ) -> Result<()> {
444 let payload = envelope.payload.clone();
445 let buffer_id = payload.buffer_id as usize;
446 let ops = payload
447 .operations
448 .into_iter()
449 .map(|op| language::proto::deserialize_operation(op))
450 .collect::<Result<Vec<_>, _>>()?;
451
452 match self {
453 Worktree::Local(worktree) => {
454 let buffer = worktree
455 .open_buffers
456 .get(&buffer_id)
457 .and_then(|buf| buf.upgrade(cx))
458 .ok_or_else(|| {
459 anyhow!("invalid buffer {} in update buffer message", buffer_id)
460 })?;
461 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
462 }
463 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
464 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
465 Some(RemoteBuffer::Loaded(buffer)) => {
466 if let Some(buffer) = buffer.upgrade(cx) {
467 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
468 } else {
469 worktree
470 .open_buffers
471 .insert(buffer_id, RemoteBuffer::Operations(ops));
472 }
473 }
474 None => {
475 worktree
476 .open_buffers
477 .insert(buffer_id, RemoteBuffer::Operations(ops));
478 }
479 },
480 }
481
482 Ok(())
483 }
484
485 pub fn handle_save_buffer(
486 &mut self,
487 envelope: TypedEnvelope<proto::SaveBuffer>,
488 rpc: Arc<Client>,
489 cx: &mut ModelContext<Self>,
490 ) -> Result<()> {
491 let sender_id = envelope.original_sender_id()?;
492 let buffer = self
493 .as_local()
494 .unwrap()
495 .shared_buffers
496 .get(&sender_id)
497 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
498 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
499
500 let receipt = envelope.receipt();
501 let worktree_id = envelope.payload.worktree_id;
502 let buffer_id = envelope.payload.buffer_id;
503 let save = cx.spawn(|_, mut cx| async move {
504 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
505 });
506
507 cx.background()
508 .spawn(
509 async move {
510 let (version, mtime) = save.await?;
511
512 rpc.respond(
513 receipt,
514 proto::BufferSaved {
515 worktree_id,
516 buffer_id,
517 version: (&version).into(),
518 mtime: Some(mtime.into()),
519 },
520 )
521 .await?;
522
523 Ok(())
524 }
525 .log_err(),
526 )
527 .detach();
528
529 Ok(())
530 }
531
532 pub fn handle_buffer_saved(
533 &mut self,
534 envelope: TypedEnvelope<proto::BufferSaved>,
535 _: Arc<Client>,
536 cx: &mut ModelContext<Self>,
537 ) -> Result<()> {
538 let payload = envelope.payload.clone();
539 let worktree = self.as_remote_mut().unwrap();
540 if let Some(buffer) = worktree
541 .open_buffers
542 .get(&(payload.buffer_id as usize))
543 .and_then(|buf| buf.upgrade(cx))
544 {
545 buffer.update(cx, |buffer, cx| {
546 let version = payload.version.try_into()?;
547 let mtime = payload
548 .mtime
549 .ok_or_else(|| anyhow!("missing mtime"))?
550 .into();
551 buffer.did_save(version, mtime, None, cx);
552 Result::<_, anyhow::Error>::Ok(())
553 })?;
554 }
555 Ok(())
556 }
557
558 pub fn handle_unshare(
559 &mut self,
560 _: TypedEnvelope<proto::UnshareWorktree>,
561 _: Arc<Client>,
562 cx: &mut ModelContext<Self>,
563 ) -> Result<()> {
564 cx.emit(Event::Closed);
565 Ok(())
566 }
567
568 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
569 match self {
570 Self::Local(worktree) => {
571 let is_fake_fs = worktree.fs.is_fake();
572 worktree.snapshot = worktree.background_snapshot.lock().clone();
573 if worktree.is_scanning() {
574 if worktree.poll_task.is_none() {
575 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
576 if is_fake_fs {
577 smol::future::yield_now().await;
578 } else {
579 smol::Timer::after(Duration::from_millis(100)).await;
580 }
581 this.update(&mut cx, |this, cx| {
582 this.as_local_mut().unwrap().poll_task = None;
583 this.poll_snapshot(cx);
584 })
585 }));
586 }
587 } else {
588 worktree.poll_task.take();
589 self.update_open_buffers(cx);
590 }
591 }
592 Self::Remote(worktree) => {
593 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
594 self.update_open_buffers(cx);
595 }
596 };
597
598 cx.notify();
599 }
600
601 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
602 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
603 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
604 Self::Remote(worktree) => {
605 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
606 if let RemoteBuffer::Loaded(buf) = buf {
607 Some((id, buf))
608 } else {
609 None
610 }
611 }))
612 }
613 };
614
615 let local = self.as_local().is_some();
616 let worktree_path = self.abs_path.clone();
617 let worktree_handle = cx.handle();
618 let mut buffers_to_delete = Vec::new();
619 for (buffer_id, buffer) in open_buffers {
620 if let Some(buffer) = buffer.upgrade(cx) {
621 buffer.update(cx, |buffer, cx| {
622 if let Some(old_file) = buffer.file() {
623 let new_file = if let Some(entry) = old_file
624 .entry_id()
625 .and_then(|entry_id| self.entry_for_id(entry_id))
626 {
627 File {
628 is_local: local,
629 worktree_path: worktree_path.clone(),
630 entry_id: Some(entry.id),
631 mtime: entry.mtime,
632 path: entry.path.clone(),
633 worktree: worktree_handle.clone(),
634 }
635 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
636 File {
637 is_local: local,
638 worktree_path: worktree_path.clone(),
639 entry_id: Some(entry.id),
640 mtime: entry.mtime,
641 path: entry.path.clone(),
642 worktree: worktree_handle.clone(),
643 }
644 } else {
645 File {
646 is_local: local,
647 worktree_path: worktree_path.clone(),
648 entry_id: None,
649 path: old_file.path().clone(),
650 mtime: old_file.mtime(),
651 worktree: worktree_handle.clone(),
652 }
653 };
654
655 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
656 task.detach();
657 }
658 }
659 });
660 } else {
661 buffers_to_delete.push(*buffer_id);
662 }
663 }
664
665 for buffer_id in buffers_to_delete {
666 match self {
667 Self::Local(worktree) => {
668 worktree.open_buffers.remove(&buffer_id);
669 }
670 Self::Remote(worktree) => {
671 worktree.open_buffers.remove(&buffer_id);
672 }
673 }
674 }
675 }
676
677 fn update_diagnostics(
678 &mut self,
679 params: lsp::PublishDiagnosticsParams,
680 cx: &mut ModelContext<Worktree>,
681 ) -> Result<()> {
682 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
683 let file_path = params
684 .uri
685 .to_file_path()
686 .map_err(|_| anyhow!("URI is not a file"))?
687 .strip_prefix(&this.abs_path)
688 .context("path is not within worktree")?
689 .to_owned();
690
691 for buffer in this.open_buffers.values() {
692 if let Some(buffer) = buffer.upgrade(cx) {
693 if buffer
694 .read(cx)
695 .file()
696 .map_or(false, |file| file.path().as_ref() == file_path)
697 {
698 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
699 (
700 buffer.remote_id(),
701 buffer.update_diagnostics(params.version, params.diagnostics, cx),
702 )
703 });
704 self.send_buffer_update(remote_id, operation?, cx);
705 return Ok(());
706 }
707 }
708 }
709
710 this.diagnostics.insert(file_path, params.diagnostics);
711 Ok(())
712 }
713
714 fn send_buffer_update(
715 &mut self,
716 buffer_id: u64,
717 operation: Operation,
718 cx: &mut ModelContext<Self>,
719 ) {
720 if let Some((rpc, remote_id)) = match self {
721 Worktree::Local(worktree) => worktree
722 .remote_id
723 .borrow()
724 .map(|id| (worktree.rpc.clone(), id)),
725 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
726 } {
727 cx.spawn(|worktree, mut cx| async move {
728 if let Err(error) = rpc
729 .request(proto::UpdateBuffer {
730 worktree_id: remote_id,
731 buffer_id,
732 operations: vec![language::proto::serialize_operation(&operation)],
733 })
734 .await
735 {
736 worktree.update(&mut cx, |worktree, _| {
737 log::error!("error sending buffer operation: {}", error);
738 match worktree {
739 Worktree::Local(t) => &mut t.queued_operations,
740 Worktree::Remote(t) => &mut t.queued_operations,
741 }
742 .push((buffer_id, operation));
743 });
744 }
745 })
746 .detach();
747 }
748 }
749}
750
751impl Deref for Worktree {
752 type Target = Snapshot;
753
754 fn deref(&self) -> &Self::Target {
755 match self {
756 Worktree::Local(worktree) => &worktree.snapshot,
757 Worktree::Remote(worktree) => &worktree.snapshot,
758 }
759 }
760}
761
762pub struct LocalWorktree {
763 snapshot: Snapshot,
764 config: WorktreeConfig,
765 background_snapshot: Arc<Mutex<Snapshot>>,
766 last_scan_state_rx: watch::Receiver<ScanState>,
767 _background_scanner_task: Option<Task<()>>,
768 _maintain_remote_id_task: Task<Option<()>>,
769 poll_task: Option<Task<()>>,
770 remote_id: watch::Receiver<Option<u64>>,
771 share: Option<ShareState>,
772 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
773 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
774 diagnostics: HashMap<PathBuf, Vec<lsp::Diagnostic>>,
775 peers: HashMap<PeerId, ReplicaId>,
776 queued_operations: Vec<(u64, Operation)>,
777 languages: Arc<LanguageRegistry>,
778 rpc: Arc<Client>,
779 fs: Arc<dyn Fs>,
780 language_servers: HashMap<String, Arc<LanguageServer>>,
781}
782
783#[derive(Default, Deserialize)]
784struct WorktreeConfig {
785 collaborators: Vec<String>,
786}
787
788impl LocalWorktree {
789 async fn new(
790 rpc: Arc<Client>,
791 path: impl Into<Arc<Path>>,
792 fs: Arc<dyn Fs>,
793 languages: Arc<LanguageRegistry>,
794 cx: &mut AsyncAppContext,
795 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
796 let abs_path = path.into();
797 let path: Arc<Path> = Arc::from(Path::new(""));
798 let next_entry_id = AtomicUsize::new(0);
799
800 // After determining whether the root entry is a file or a directory, populate the
801 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
802 let root_name = abs_path
803 .file_name()
804 .map_or(String::new(), |f| f.to_string_lossy().to_string());
805 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
806 let metadata = fs.metadata(&abs_path).await?;
807
808 let mut config = WorktreeConfig::default();
809 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
810 if let Ok(parsed) = toml::from_str(&zed_toml) {
811 config = parsed;
812 }
813 }
814
815 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
816 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
817 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
818 let mut snapshot = Snapshot {
819 id: cx.model_id(),
820 scan_id: 0,
821 abs_path,
822 root_name: root_name.clone(),
823 root_char_bag,
824 ignores: Default::default(),
825 entries_by_path: Default::default(),
826 entries_by_id: Default::default(),
827 removed_entry_ids: Default::default(),
828 next_entry_id: Arc::new(next_entry_id),
829 };
830 if let Some(metadata) = metadata {
831 snapshot.insert_entry(
832 Entry::new(
833 path.into(),
834 &metadata,
835 &snapshot.next_entry_id,
836 snapshot.root_char_bag,
837 ),
838 fs.as_ref(),
839 );
840 }
841
842 let (mut remote_id_tx, remote_id_rx) = watch::channel();
843 let _maintain_remote_id_task = cx.spawn_weak({
844 let rpc = rpc.clone();
845 move |this, cx| {
846 async move {
847 let mut status = rpc.status();
848 while let Some(status) = status.recv().await {
849 if let Some(this) = this.upgrade(&cx) {
850 let remote_id = if let client::Status::Connected { .. } = status {
851 let collaborator_logins = this.read_with(&cx, |this, _| {
852 this.as_local().unwrap().config.collaborators.clone()
853 });
854 let response = rpc
855 .request(proto::OpenWorktree {
856 root_name: root_name.clone(),
857 collaborator_logins,
858 })
859 .await?;
860
861 Some(response.worktree_id)
862 } else {
863 None
864 };
865 if remote_id_tx.send(remote_id).await.is_err() {
866 break;
867 }
868 }
869 }
870 Ok(())
871 }
872 .log_err()
873 }
874 });
875
876 let tree = Self {
877 snapshot: snapshot.clone(),
878 config,
879 remote_id: remote_id_rx,
880 background_snapshot: Arc::new(Mutex::new(snapshot)),
881 last_scan_state_rx,
882 _background_scanner_task: None,
883 _maintain_remote_id_task,
884 share: None,
885 poll_task: None,
886 open_buffers: Default::default(),
887 shared_buffers: Default::default(),
888 diagnostics: Default::default(),
889 queued_operations: Default::default(),
890 peers: Default::default(),
891 languages,
892 rpc,
893 fs,
894 language_servers: Default::default(),
895 };
896
897 cx.spawn_weak(|this, mut cx| async move {
898 while let Ok(scan_state) = scan_states_rx.recv().await {
899 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
900 let to_send = handle.update(&mut cx, |this, cx| {
901 last_scan_state_tx.blocking_send(scan_state).ok();
902 this.poll_snapshot(cx);
903 let tree = this.as_local_mut().unwrap();
904 if !tree.is_scanning() {
905 if let Some(share) = tree.share.as_ref() {
906 return Some((tree.snapshot(), share.snapshots_tx.clone()));
907 }
908 }
909 None
910 });
911
912 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
913 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
914 log::error!("error submitting snapshot to send {}", err);
915 }
916 }
917 } else {
918 break;
919 }
920 }
921 })
922 .detach();
923
924 Worktree::Local(tree)
925 });
926
927 Ok((tree, scan_states_tx))
928 }
929
930 pub fn languages(&self) -> &LanguageRegistry {
931 &self.languages
932 }
933
934 pub fn ensure_language_server(
935 &mut self,
936 language: &Language,
937 cx: &mut ModelContext<Worktree>,
938 ) -> Option<Arc<LanguageServer>> {
939 if let Some(server) = self.language_servers.get(language.name()) {
940 return Some(server.clone());
941 }
942
943 if let Some(language_server) = language
944 .start_server(self.abs_path(), cx)
945 .log_err()
946 .flatten()
947 {
948 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
949 language_server
950 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
951 smol::block_on(diagnostics_tx.send(params)).ok();
952 })
953 .detach();
954 cx.spawn_weak(|this, mut cx| async move {
955 while let Ok(diagnostics) = diagnostics_rx.recv().await {
956 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
957 handle.update(&mut cx, |this, cx| {
958 this.update_diagnostics(diagnostics, cx).log_err();
959 });
960 } else {
961 break;
962 }
963 }
964 })
965 .detach();
966
967 self.language_servers
968 .insert(language.name().to_string(), language_server.clone());
969 Some(language_server.clone())
970 } else {
971 None
972 }
973 }
974
975 pub fn open_buffer(
976 &mut self,
977 path: &Path,
978 cx: &mut ModelContext<Worktree>,
979 ) -> Task<Result<ModelHandle<Buffer>>> {
980 let handle = cx.handle();
981
982 // If there is already a buffer for the given path, then return it.
983 let mut existing_buffer = None;
984 self.open_buffers.retain(|_buffer_id, buffer| {
985 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
986 if let Some(file) = buffer.read(cx.as_ref()).file() {
987 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
988 existing_buffer = Some(buffer);
989 }
990 }
991 true
992 } else {
993 false
994 }
995 });
996
997 let path = Arc::from(path);
998 cx.spawn(|this, mut cx| async move {
999 if let Some(existing_buffer) = existing_buffer {
1000 Ok(existing_buffer)
1001 } else {
1002 let (file, contents) = this
1003 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
1004 .await?;
1005 let language = this.read_with(&cx, |this, _| {
1006 use language::File;
1007 this.languages().select_language(file.full_path()).cloned()
1008 });
1009 let (diagnostics, language_server) = this.update(&mut cx, |this, cx| {
1010 let this = this.as_local_mut().unwrap();
1011 (
1012 this.diagnostics.remove(path.as_ref()),
1013 language
1014 .as_ref()
1015 .and_then(|language| this.ensure_language_server(language, cx)),
1016 )
1017 });
1018 let buffer = cx.add_model(|cx| {
1019 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
1020 buffer.set_language(language, language_server, cx);
1021 if let Some(diagnostics) = diagnostics {
1022 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1023 }
1024 buffer
1025 });
1026 this.update(&mut cx, |this, _| {
1027 let this = this
1028 .as_local_mut()
1029 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1030
1031 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1032 Ok(buffer)
1033 })
1034 }
1035 })
1036 }
1037
1038 pub fn open_remote_buffer(
1039 &mut self,
1040 envelope: TypedEnvelope<proto::OpenBuffer>,
1041 cx: &mut ModelContext<Worktree>,
1042 ) -> Task<Result<proto::OpenBufferResponse>> {
1043 let peer_id = envelope.original_sender_id();
1044 let path = Path::new(&envelope.payload.path);
1045
1046 let buffer = self.open_buffer(path, cx);
1047
1048 cx.spawn(|this, mut cx| async move {
1049 let buffer = buffer.await?;
1050 this.update(&mut cx, |this, cx| {
1051 this.as_local_mut()
1052 .unwrap()
1053 .shared_buffers
1054 .entry(peer_id?)
1055 .or_default()
1056 .insert(buffer.id() as u64, buffer.clone());
1057
1058 Ok(proto::OpenBufferResponse {
1059 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1060 })
1061 })
1062 })
1063 }
1064
1065 pub fn close_remote_buffer(
1066 &mut self,
1067 envelope: TypedEnvelope<proto::CloseBuffer>,
1068 cx: &mut ModelContext<Worktree>,
1069 ) -> Result<()> {
1070 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1071 shared_buffers.remove(&envelope.payload.buffer_id);
1072 cx.notify();
1073 }
1074
1075 Ok(())
1076 }
1077
1078 pub fn add_peer(
1079 &mut self,
1080 envelope: TypedEnvelope<proto::AddPeer>,
1081 cx: &mut ModelContext<Worktree>,
1082 ) -> Result<()> {
1083 let peer = envelope
1084 .payload
1085 .peer
1086 .as_ref()
1087 .ok_or_else(|| anyhow!("empty peer"))?;
1088 self.peers
1089 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1090 cx.notify();
1091
1092 Ok(())
1093 }
1094
1095 pub fn remove_peer(
1096 &mut self,
1097 envelope: TypedEnvelope<proto::RemovePeer>,
1098 cx: &mut ModelContext<Worktree>,
1099 ) -> Result<()> {
1100 let peer_id = PeerId(envelope.payload.peer_id);
1101 let replica_id = self
1102 .peers
1103 .remove(&peer_id)
1104 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1105 self.shared_buffers.remove(&peer_id);
1106 for (_, buffer) in &self.open_buffers {
1107 if let Some(buffer) = buffer.upgrade(cx) {
1108 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1109 }
1110 }
1111 cx.notify();
1112
1113 Ok(())
1114 }
1115
1116 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1117 let mut scan_state_rx = self.last_scan_state_rx.clone();
1118 async move {
1119 let mut scan_state = Some(scan_state_rx.borrow().clone());
1120 while let Some(ScanState::Scanning) = scan_state {
1121 scan_state = scan_state_rx.recv().await;
1122 }
1123 }
1124 }
1125
1126 pub fn remote_id(&self) -> Option<u64> {
1127 *self.remote_id.borrow()
1128 }
1129
1130 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1131 let mut remote_id = self.remote_id.clone();
1132 async move {
1133 while let Some(remote_id) = remote_id.recv().await {
1134 if remote_id.is_some() {
1135 return remote_id;
1136 }
1137 }
1138 None
1139 }
1140 }
1141
1142 fn is_scanning(&self) -> bool {
1143 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1144 true
1145 } else {
1146 false
1147 }
1148 }
1149
1150 pub fn snapshot(&self) -> Snapshot {
1151 self.snapshot.clone()
1152 }
1153
1154 pub fn abs_path(&self) -> &Path {
1155 self.snapshot.abs_path.as_ref()
1156 }
1157
1158 pub fn contains_abs_path(&self, path: &Path) -> bool {
1159 path.starts_with(&self.snapshot.abs_path)
1160 }
1161
1162 fn absolutize(&self, path: &Path) -> PathBuf {
1163 if path.file_name().is_some() {
1164 self.snapshot.abs_path.join(path)
1165 } else {
1166 self.snapshot.abs_path.to_path_buf()
1167 }
1168 }
1169
1170 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1171 let handle = cx.handle();
1172 let path = Arc::from(path);
1173 let worktree_path = self.abs_path.clone();
1174 let abs_path = self.absolutize(&path);
1175 let background_snapshot = self.background_snapshot.clone();
1176 let fs = self.fs.clone();
1177 cx.spawn(|this, mut cx| async move {
1178 let text = fs.load(&abs_path).await?;
1179 // Eagerly populate the snapshot with an updated entry for the loaded file
1180 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1181 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1182 Ok((
1183 File {
1184 entry_id: Some(entry.id),
1185 worktree: handle,
1186 worktree_path,
1187 path: entry.path,
1188 mtime: entry.mtime,
1189 is_local: true,
1190 },
1191 text,
1192 ))
1193 })
1194 }
1195
1196 pub fn save_buffer_as(
1197 &self,
1198 buffer: ModelHandle<Buffer>,
1199 path: impl Into<Arc<Path>>,
1200 text: Rope,
1201 cx: &mut ModelContext<Worktree>,
1202 ) -> Task<Result<File>> {
1203 let save = self.save(path, text, cx);
1204 cx.spawn(|this, mut cx| async move {
1205 let entry = save.await?;
1206 this.update(&mut cx, |this, cx| {
1207 let this = this.as_local_mut().unwrap();
1208 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1209 Ok(File {
1210 entry_id: Some(entry.id),
1211 worktree: cx.handle(),
1212 worktree_path: this.abs_path.clone(),
1213 path: entry.path,
1214 mtime: entry.mtime,
1215 is_local: true,
1216 })
1217 })
1218 })
1219 }
1220
1221 fn save(
1222 &self,
1223 path: impl Into<Arc<Path>>,
1224 text: Rope,
1225 cx: &mut ModelContext<Worktree>,
1226 ) -> Task<Result<Entry>> {
1227 let path = path.into();
1228 let abs_path = self.absolutize(&path);
1229 let background_snapshot = self.background_snapshot.clone();
1230 let fs = self.fs.clone();
1231 let save = cx.background().spawn(async move {
1232 fs.save(&abs_path, &text).await?;
1233 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1234 });
1235
1236 cx.spawn(|this, mut cx| async move {
1237 let entry = save.await?;
1238 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1239 Ok(entry)
1240 })
1241 }
1242
1243 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1244 let snapshot = self.snapshot();
1245 let share_request = self.share_request(cx);
1246 let rpc = self.rpc.clone();
1247 cx.spawn(|this, mut cx| async move {
1248 let share_request = if let Some(request) = share_request.await {
1249 request
1250 } else {
1251 return Err(anyhow!("failed to open worktree on the server"));
1252 };
1253
1254 let remote_id = share_request.worktree.as_ref().unwrap().id;
1255 let share_response = rpc.request(share_request).await?;
1256
1257 log::info!("sharing worktree {:?}", share_response);
1258 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1259 smol::channel::unbounded::<Snapshot>();
1260
1261 cx.background()
1262 .spawn({
1263 let rpc = rpc.clone();
1264 async move {
1265 let mut prev_snapshot = snapshot;
1266 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1267 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1268 match rpc.send(message).await {
1269 Ok(()) => prev_snapshot = snapshot,
1270 Err(err) => log::error!("error sending snapshot diff {}", err),
1271 }
1272 }
1273 }
1274 })
1275 .detach();
1276
1277 this.update(&mut cx, |worktree, cx| {
1278 let _subscriptions = vec![
1279 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer),
1280 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer),
1281 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1282 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1283 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1284 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1285 ];
1286
1287 let worktree = worktree.as_local_mut().unwrap();
1288 worktree.share = Some(ShareState {
1289 snapshots_tx: snapshots_to_send_tx,
1290 _subscriptions,
1291 });
1292 });
1293
1294 Ok(remote_id)
1295 })
1296 }
1297
1298 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1299 self.share.take();
1300 let rpc = self.rpc.clone();
1301 let remote_id = self.remote_id();
1302 cx.foreground()
1303 .spawn(
1304 async move {
1305 if let Some(worktree_id) = remote_id {
1306 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1307 }
1308 Ok(())
1309 }
1310 .log_err(),
1311 )
1312 .detach()
1313 }
1314
1315 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1316 let remote_id = self.next_remote_id();
1317 let snapshot = self.snapshot();
1318 let root_name = self.root_name.clone();
1319 cx.background().spawn(async move {
1320 remote_id.await.map(|id| {
1321 let entries = snapshot
1322 .entries_by_path
1323 .cursor::<()>()
1324 .filter(|e| !e.is_ignored)
1325 .map(Into::into)
1326 .collect();
1327 proto::ShareWorktree {
1328 worktree: Some(proto::Worktree {
1329 id,
1330 root_name,
1331 entries,
1332 }),
1333 }
1334 })
1335 })
1336 }
1337}
1338
1339fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1340 let contents = smol::block_on(fs.load(&abs_path))?;
1341 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1342 let mut builder = GitignoreBuilder::new(parent);
1343 for line in contents.lines() {
1344 builder.add_line(Some(abs_path.into()), line)?;
1345 }
1346 Ok(builder.build()?)
1347}
1348
1349impl Deref for LocalWorktree {
1350 type Target = Snapshot;
1351
1352 fn deref(&self) -> &Self::Target {
1353 &self.snapshot
1354 }
1355}
1356
1357impl fmt::Debug for LocalWorktree {
1358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1359 self.snapshot.fmt(f)
1360 }
1361}
1362
1363struct ShareState {
1364 snapshots_tx: Sender<Snapshot>,
1365 _subscriptions: Vec<client::Subscription>,
1366}
1367
1368pub struct RemoteWorktree {
1369 remote_id: u64,
1370 snapshot: Snapshot,
1371 snapshot_rx: watch::Receiver<Snapshot>,
1372 client: Arc<Client>,
1373 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1374 replica_id: ReplicaId,
1375 open_buffers: HashMap<usize, RemoteBuffer>,
1376 peers: HashMap<PeerId, ReplicaId>,
1377 languages: Arc<LanguageRegistry>,
1378 queued_operations: Vec<(u64, Operation)>,
1379 _subscriptions: Vec<client::Subscription>,
1380}
1381
1382impl RemoteWorktree {
1383 pub fn open_buffer(
1384 &mut self,
1385 path: &Path,
1386 cx: &mut ModelContext<Worktree>,
1387 ) -> Task<Result<ModelHandle<Buffer>>> {
1388 let mut existing_buffer = None;
1389 self.open_buffers.retain(|_buffer_id, buffer| {
1390 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1391 if let Some(file) = buffer.read(cx.as_ref()).file() {
1392 if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
1393 existing_buffer = Some(buffer);
1394 }
1395 }
1396 true
1397 } else {
1398 false
1399 }
1400 });
1401
1402 let rpc = self.client.clone();
1403 let replica_id = self.replica_id;
1404 let remote_worktree_id = self.remote_id;
1405 let root_path = self.snapshot.abs_path.clone();
1406 let path = path.to_string_lossy().to_string();
1407 cx.spawn_weak(|this, mut cx| async move {
1408 if let Some(existing_buffer) = existing_buffer {
1409 Ok(existing_buffer)
1410 } else {
1411 let entry = this
1412 .upgrade(&cx)
1413 .ok_or_else(|| anyhow!("worktree was closed"))?
1414 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1415 .ok_or_else(|| anyhow!("file does not exist"))?;
1416 let response = rpc
1417 .request(proto::OpenBuffer {
1418 worktree_id: remote_worktree_id as u64,
1419 path,
1420 })
1421 .await?;
1422
1423 let this = this
1424 .upgrade(&cx)
1425 .ok_or_else(|| anyhow!("worktree was closed"))?;
1426 let file = File {
1427 entry_id: Some(entry.id),
1428 worktree: this.clone(),
1429 worktree_path: root_path,
1430 path: entry.path,
1431 mtime: entry.mtime,
1432 is_local: false,
1433 };
1434 let language = this.read_with(&cx, |this, _| {
1435 use language::File;
1436 this.languages().select_language(file.full_path()).cloned()
1437 });
1438 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1439 let buffer_id = remote_buffer.id as usize;
1440 let buffer = cx.add_model(|cx| {
1441 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1442 .unwrap()
1443 .with_language(language, None, cx)
1444 });
1445 this.update(&mut cx, |this, cx| {
1446 let this = this.as_remote_mut().unwrap();
1447 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1448 .open_buffers
1449 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1450 {
1451 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1452 }
1453 Result::<_, anyhow::Error>::Ok(())
1454 })?;
1455 Ok(buffer)
1456 }
1457 })
1458 }
1459
1460 pub fn remote_id(&self) -> u64 {
1461 self.remote_id
1462 }
1463
1464 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1465 for (_, buffer) in self.open_buffers.drain() {
1466 if let RemoteBuffer::Loaded(buffer) = buffer {
1467 if let Some(buffer) = buffer.upgrade(cx) {
1468 buffer.update(cx, |buffer, cx| buffer.close(cx))
1469 }
1470 }
1471 }
1472 }
1473
1474 fn snapshot(&self) -> Snapshot {
1475 self.snapshot.clone()
1476 }
1477
1478 fn update_from_remote(
1479 &mut self,
1480 envelope: TypedEnvelope<proto::UpdateWorktree>,
1481 cx: &mut ModelContext<Worktree>,
1482 ) -> Result<()> {
1483 let mut tx = self.updates_tx.clone();
1484 let payload = envelope.payload.clone();
1485 cx.background()
1486 .spawn(async move {
1487 tx.send(payload).await.expect("receiver runs to completion");
1488 })
1489 .detach();
1490
1491 Ok(())
1492 }
1493
1494 pub fn add_peer(
1495 &mut self,
1496 envelope: TypedEnvelope<proto::AddPeer>,
1497 cx: &mut ModelContext<Worktree>,
1498 ) -> Result<()> {
1499 let peer = envelope
1500 .payload
1501 .peer
1502 .as_ref()
1503 .ok_or_else(|| anyhow!("empty peer"))?;
1504 self.peers
1505 .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId);
1506 cx.notify();
1507 Ok(())
1508 }
1509
1510 pub fn remove_peer(
1511 &mut self,
1512 envelope: TypedEnvelope<proto::RemovePeer>,
1513 cx: &mut ModelContext<Worktree>,
1514 ) -> Result<()> {
1515 let peer_id = PeerId(envelope.payload.peer_id);
1516 let replica_id = self
1517 .peers
1518 .remove(&peer_id)
1519 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?;
1520 for (_, buffer) in &self.open_buffers {
1521 if let Some(buffer) = buffer.upgrade(cx) {
1522 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1523 }
1524 }
1525 cx.notify();
1526 Ok(())
1527 }
1528}
1529
1530enum RemoteBuffer {
1531 Operations(Vec<Operation>),
1532 Loaded(WeakModelHandle<Buffer>),
1533}
1534
1535impl RemoteBuffer {
1536 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1537 match self {
1538 Self::Operations(_) => None,
1539 Self::Loaded(buffer) => buffer.upgrade(cx),
1540 }
1541 }
1542}
1543
1544#[derive(Clone)]
1545pub struct Snapshot {
1546 id: usize,
1547 scan_id: usize,
1548 abs_path: Arc<Path>,
1549 root_name: String,
1550 root_char_bag: CharBag,
1551 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1552 entries_by_path: SumTree<Entry>,
1553 entries_by_id: SumTree<PathEntry>,
1554 removed_entry_ids: HashMap<u64, usize>,
1555 next_entry_id: Arc<AtomicUsize>,
1556}
1557
1558impl Snapshot {
1559 pub fn id(&self) -> usize {
1560 self.id
1561 }
1562
1563 pub fn build_update(
1564 &self,
1565 other: &Self,
1566 worktree_id: u64,
1567 include_ignored: bool,
1568 ) -> proto::UpdateWorktree {
1569 let mut updated_entries = Vec::new();
1570 let mut removed_entries = Vec::new();
1571 let mut self_entries = self
1572 .entries_by_id
1573 .cursor::<()>()
1574 .filter(|e| include_ignored || !e.is_ignored)
1575 .peekable();
1576 let mut other_entries = other
1577 .entries_by_id
1578 .cursor::<()>()
1579 .filter(|e| include_ignored || !e.is_ignored)
1580 .peekable();
1581 loop {
1582 match (self_entries.peek(), other_entries.peek()) {
1583 (Some(self_entry), Some(other_entry)) => {
1584 match Ord::cmp(&self_entry.id, &other_entry.id) {
1585 Ordering::Less => {
1586 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1587 updated_entries.push(entry);
1588 self_entries.next();
1589 }
1590 Ordering::Equal => {
1591 if self_entry.scan_id != other_entry.scan_id {
1592 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1593 updated_entries.push(entry);
1594 }
1595
1596 self_entries.next();
1597 other_entries.next();
1598 }
1599 Ordering::Greater => {
1600 removed_entries.push(other_entry.id as u64);
1601 other_entries.next();
1602 }
1603 }
1604 }
1605 (Some(self_entry), None) => {
1606 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1607 updated_entries.push(entry);
1608 self_entries.next();
1609 }
1610 (None, Some(other_entry)) => {
1611 removed_entries.push(other_entry.id as u64);
1612 other_entries.next();
1613 }
1614 (None, None) => break,
1615 }
1616 }
1617
1618 proto::UpdateWorktree {
1619 updated_entries,
1620 removed_entries,
1621 worktree_id,
1622 }
1623 }
1624
1625 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1626 self.scan_id += 1;
1627 let scan_id = self.scan_id;
1628
1629 let mut entries_by_path_edits = Vec::new();
1630 let mut entries_by_id_edits = Vec::new();
1631 for entry_id in update.removed_entries {
1632 let entry_id = entry_id as usize;
1633 let entry = self
1634 .entry_for_id(entry_id)
1635 .ok_or_else(|| anyhow!("unknown entry"))?;
1636 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1637 entries_by_id_edits.push(Edit::Remove(entry.id));
1638 }
1639
1640 for entry in update.updated_entries {
1641 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1642 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1643 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1644 }
1645 entries_by_id_edits.push(Edit::Insert(PathEntry {
1646 id: entry.id,
1647 path: entry.path.clone(),
1648 is_ignored: entry.is_ignored,
1649 scan_id,
1650 }));
1651 entries_by_path_edits.push(Edit::Insert(entry));
1652 }
1653
1654 self.entries_by_path.edit(entries_by_path_edits, &());
1655 self.entries_by_id.edit(entries_by_id_edits, &());
1656
1657 Ok(())
1658 }
1659
1660 pub fn file_count(&self) -> usize {
1661 self.entries_by_path.summary().file_count
1662 }
1663
1664 pub fn visible_file_count(&self) -> usize {
1665 self.entries_by_path.summary().visible_file_count
1666 }
1667
1668 fn traverse_from_offset(
1669 &self,
1670 include_dirs: bool,
1671 include_ignored: bool,
1672 start_offset: usize,
1673 ) -> Traversal {
1674 let mut cursor = self.entries_by_path.cursor();
1675 cursor.seek(
1676 &TraversalTarget::Count {
1677 count: start_offset,
1678 include_dirs,
1679 include_ignored,
1680 },
1681 Bias::Right,
1682 &(),
1683 );
1684 Traversal {
1685 cursor,
1686 include_dirs,
1687 include_ignored,
1688 }
1689 }
1690
1691 fn traverse_from_path(
1692 &self,
1693 include_dirs: bool,
1694 include_ignored: bool,
1695 path: &Path,
1696 ) -> Traversal {
1697 let mut cursor = self.entries_by_path.cursor();
1698 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1699 Traversal {
1700 cursor,
1701 include_dirs,
1702 include_ignored,
1703 }
1704 }
1705
1706 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1707 self.traverse_from_offset(false, include_ignored, start)
1708 }
1709
1710 pub fn entries(&self, include_ignored: bool) -> Traversal {
1711 self.traverse_from_offset(true, include_ignored, 0)
1712 }
1713
1714 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1715 let empty_path = Path::new("");
1716 self.entries_by_path
1717 .cursor::<()>()
1718 .filter(move |entry| entry.path.as_ref() != empty_path)
1719 .map(|entry| &entry.path)
1720 }
1721
1722 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1723 let mut cursor = self.entries_by_path.cursor();
1724 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1725 let traversal = Traversal {
1726 cursor,
1727 include_dirs: true,
1728 include_ignored: true,
1729 };
1730 ChildEntriesIter {
1731 traversal,
1732 parent_path,
1733 }
1734 }
1735
1736 pub fn root_entry(&self) -> Option<&Entry> {
1737 self.entry_for_path("")
1738 }
1739
1740 pub fn root_name(&self) -> &str {
1741 &self.root_name
1742 }
1743
1744 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1745 let path = path.as_ref();
1746 self.traverse_from_path(true, true, path)
1747 .entry()
1748 .and_then(|entry| {
1749 if entry.path.as_ref() == path {
1750 Some(entry)
1751 } else {
1752 None
1753 }
1754 })
1755 }
1756
1757 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1758 let entry = self.entries_by_id.get(&id, &())?;
1759 self.entry_for_path(&entry.path)
1760 }
1761
1762 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1763 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1764 }
1765
1766 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1767 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1768 let abs_path = self.abs_path.join(&entry.path);
1769 match build_gitignore(&abs_path, fs) {
1770 Ok(ignore) => {
1771 let ignore_dir_path = entry.path.parent().unwrap();
1772 self.ignores
1773 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1774 }
1775 Err(error) => {
1776 log::error!(
1777 "error loading .gitignore file {:?} - {:?}",
1778 &entry.path,
1779 error
1780 );
1781 }
1782 }
1783 }
1784
1785 self.reuse_entry_id(&mut entry);
1786 self.entries_by_path.insert_or_replace(entry.clone(), &());
1787 self.entries_by_id.insert_or_replace(
1788 PathEntry {
1789 id: entry.id,
1790 path: entry.path.clone(),
1791 is_ignored: entry.is_ignored,
1792 scan_id: self.scan_id,
1793 },
1794 &(),
1795 );
1796 entry
1797 }
1798
1799 fn populate_dir(
1800 &mut self,
1801 parent_path: Arc<Path>,
1802 entries: impl IntoIterator<Item = Entry>,
1803 ignore: Option<Arc<Gitignore>>,
1804 ) {
1805 let mut parent_entry = self
1806 .entries_by_path
1807 .get(&PathKey(parent_path.clone()), &())
1808 .unwrap()
1809 .clone();
1810 if let Some(ignore) = ignore {
1811 self.ignores.insert(parent_path, (ignore, self.scan_id));
1812 }
1813 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1814 parent_entry.kind = EntryKind::Dir;
1815 } else {
1816 unreachable!();
1817 }
1818
1819 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1820 let mut entries_by_id_edits = Vec::new();
1821
1822 for mut entry in entries {
1823 self.reuse_entry_id(&mut entry);
1824 entries_by_id_edits.push(Edit::Insert(PathEntry {
1825 id: entry.id,
1826 path: entry.path.clone(),
1827 is_ignored: entry.is_ignored,
1828 scan_id: self.scan_id,
1829 }));
1830 entries_by_path_edits.push(Edit::Insert(entry));
1831 }
1832
1833 self.entries_by_path.edit(entries_by_path_edits, &());
1834 self.entries_by_id.edit(entries_by_id_edits, &());
1835 }
1836
1837 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1838 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1839 entry.id = removed_entry_id;
1840 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1841 entry.id = existing_entry.id;
1842 }
1843 }
1844
1845 fn remove_path(&mut self, path: &Path) {
1846 let mut new_entries;
1847 let removed_entries;
1848 {
1849 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
1850 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
1851 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
1852 new_entries.push_tree(cursor.suffix(&()), &());
1853 }
1854 self.entries_by_path = new_entries;
1855
1856 let mut entries_by_id_edits = Vec::new();
1857 for entry in removed_entries.cursor::<()>() {
1858 let removed_entry_id = self
1859 .removed_entry_ids
1860 .entry(entry.inode)
1861 .or_insert(entry.id);
1862 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1863 entries_by_id_edits.push(Edit::Remove(entry.id));
1864 }
1865 self.entries_by_id.edit(entries_by_id_edits, &());
1866
1867 if path.file_name() == Some(&GITIGNORE) {
1868 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1869 *scan_id = self.scan_id;
1870 }
1871 }
1872 }
1873
1874 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1875 let mut new_ignores = Vec::new();
1876 for ancestor in path.ancestors().skip(1) {
1877 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1878 new_ignores.push((ancestor, Some(ignore.clone())));
1879 } else {
1880 new_ignores.push((ancestor, None));
1881 }
1882 }
1883
1884 let mut ignore_stack = IgnoreStack::none();
1885 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1886 if ignore_stack.is_path_ignored(&parent_path, true) {
1887 ignore_stack = IgnoreStack::all();
1888 break;
1889 } else if let Some(ignore) = ignore {
1890 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1891 }
1892 }
1893
1894 if ignore_stack.is_path_ignored(path, is_dir) {
1895 ignore_stack = IgnoreStack::all();
1896 }
1897
1898 ignore_stack
1899 }
1900}
1901
1902impl fmt::Debug for Snapshot {
1903 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1904 for entry in self.entries_by_path.cursor::<()>() {
1905 for _ in entry.path.ancestors().skip(1) {
1906 write!(f, " ")?;
1907 }
1908 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1909 }
1910 Ok(())
1911 }
1912}
1913
1914#[derive(Clone, PartialEq)]
1915pub struct File {
1916 entry_id: Option<usize>,
1917 worktree: ModelHandle<Worktree>,
1918 worktree_path: Arc<Path>,
1919 pub path: Arc<Path>,
1920 pub mtime: SystemTime,
1921 is_local: bool,
1922}
1923
1924impl language::File for File {
1925 fn worktree_id(&self) -> usize {
1926 self.worktree.id()
1927 }
1928
1929 fn entry_id(&self) -> Option<usize> {
1930 self.entry_id
1931 }
1932
1933 fn mtime(&self) -> SystemTime {
1934 self.mtime
1935 }
1936
1937 fn path(&self) -> &Arc<Path> {
1938 &self.path
1939 }
1940
1941 fn abs_path(&self) -> Option<PathBuf> {
1942 if self.is_local {
1943 Some(self.worktree_path.join(&self.path))
1944 } else {
1945 None
1946 }
1947 }
1948
1949 fn full_path(&self) -> PathBuf {
1950 let mut full_path = PathBuf::new();
1951 if let Some(worktree_name) = self.worktree_path.file_name() {
1952 full_path.push(worktree_name);
1953 }
1954 full_path.push(&self.path);
1955 full_path
1956 }
1957
1958 /// Returns the last component of this handle's absolute path. If this handle refers to the root
1959 /// of its worktree, then this method will return the name of the worktree itself.
1960 fn file_name<'a>(&'a self) -> Option<OsString> {
1961 self.path
1962 .file_name()
1963 .or_else(|| self.worktree_path.file_name())
1964 .map(Into::into)
1965 }
1966
1967 fn is_deleted(&self) -> bool {
1968 self.entry_id.is_none()
1969 }
1970
1971 fn save(
1972 &self,
1973 buffer_id: u64,
1974 text: Rope,
1975 version: clock::Global,
1976 cx: &mut MutableAppContext,
1977 ) -> Task<Result<(clock::Global, SystemTime)>> {
1978 self.worktree.update(cx, |worktree, cx| match worktree {
1979 Worktree::Local(worktree) => {
1980 let rpc = worktree.rpc.clone();
1981 let worktree_id = *worktree.remote_id.borrow();
1982 let save = worktree.save(self.path.clone(), text, cx);
1983 cx.background().spawn(async move {
1984 let entry = save.await?;
1985 if let Some(worktree_id) = worktree_id {
1986 rpc.send(proto::BufferSaved {
1987 worktree_id,
1988 buffer_id,
1989 version: (&version).into(),
1990 mtime: Some(entry.mtime.into()),
1991 })
1992 .await?;
1993 }
1994 Ok((version, entry.mtime))
1995 })
1996 }
1997 Worktree::Remote(worktree) => {
1998 let rpc = worktree.client.clone();
1999 let worktree_id = worktree.remote_id;
2000 cx.foreground().spawn(async move {
2001 let response = rpc
2002 .request(proto::SaveBuffer {
2003 worktree_id,
2004 buffer_id,
2005 })
2006 .await?;
2007 let version = response.version.try_into()?;
2008 let mtime = response
2009 .mtime
2010 .ok_or_else(|| anyhow!("missing mtime"))?
2011 .into();
2012 Ok((version, mtime))
2013 })
2014 }
2015 })
2016 }
2017
2018 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
2019 let worktree = self.worktree.read(cx).as_local()?;
2020 let abs_path = worktree.absolutize(&self.path);
2021 let fs = worktree.fs.clone();
2022 Some(
2023 cx.background()
2024 .spawn(async move { fs.load(&abs_path).await }),
2025 )
2026 }
2027
2028 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2029 self.worktree.update(cx, |worktree, cx| {
2030 worktree.send_buffer_update(buffer_id, operation, cx);
2031 });
2032 }
2033
2034 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2035 self.worktree.update(cx, |worktree, cx| {
2036 if let Worktree::Remote(worktree) = worktree {
2037 let worktree_id = worktree.remote_id;
2038 let rpc = worktree.client.clone();
2039 cx.background()
2040 .spawn(async move {
2041 if let Err(error) = rpc
2042 .send(proto::CloseBuffer {
2043 worktree_id,
2044 buffer_id,
2045 })
2046 .await
2047 {
2048 log::error!("error closing remote buffer: {}", error);
2049 }
2050 })
2051 .detach();
2052 }
2053 });
2054 }
2055
2056 fn boxed_clone(&self) -> Box<dyn language::File> {
2057 Box::new(self.clone())
2058 }
2059
2060 fn as_any(&self) -> &dyn Any {
2061 self
2062 }
2063}
2064
2065#[derive(Clone, Debug)]
2066pub struct Entry {
2067 pub id: usize,
2068 pub kind: EntryKind,
2069 pub path: Arc<Path>,
2070 pub inode: u64,
2071 pub mtime: SystemTime,
2072 pub is_symlink: bool,
2073 pub is_ignored: bool,
2074}
2075
2076#[derive(Clone, Debug)]
2077pub enum EntryKind {
2078 PendingDir,
2079 Dir,
2080 File(CharBag),
2081}
2082
2083impl Entry {
2084 fn new(
2085 path: Arc<Path>,
2086 metadata: &fs::Metadata,
2087 next_entry_id: &AtomicUsize,
2088 root_char_bag: CharBag,
2089 ) -> Self {
2090 Self {
2091 id: next_entry_id.fetch_add(1, SeqCst),
2092 kind: if metadata.is_dir {
2093 EntryKind::PendingDir
2094 } else {
2095 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2096 },
2097 path,
2098 inode: metadata.inode,
2099 mtime: metadata.mtime,
2100 is_symlink: metadata.is_symlink,
2101 is_ignored: false,
2102 }
2103 }
2104
2105 pub fn is_dir(&self) -> bool {
2106 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2107 }
2108
2109 pub fn is_file(&self) -> bool {
2110 matches!(self.kind, EntryKind::File(_))
2111 }
2112}
2113
2114impl sum_tree::Item for Entry {
2115 type Summary = EntrySummary;
2116
2117 fn summary(&self) -> Self::Summary {
2118 let visible_count = if self.is_ignored { 0 } else { 1 };
2119 let file_count;
2120 let visible_file_count;
2121 if self.is_file() {
2122 file_count = 1;
2123 visible_file_count = visible_count;
2124 } else {
2125 file_count = 0;
2126 visible_file_count = 0;
2127 }
2128
2129 EntrySummary {
2130 max_path: self.path.clone(),
2131 count: 1,
2132 visible_count,
2133 file_count,
2134 visible_file_count,
2135 }
2136 }
2137}
2138
2139impl sum_tree::KeyedItem for Entry {
2140 type Key = PathKey;
2141
2142 fn key(&self) -> Self::Key {
2143 PathKey(self.path.clone())
2144 }
2145}
2146
2147#[derive(Clone, Debug)]
2148pub struct EntrySummary {
2149 max_path: Arc<Path>,
2150 count: usize,
2151 visible_count: usize,
2152 file_count: usize,
2153 visible_file_count: usize,
2154}
2155
2156impl Default for EntrySummary {
2157 fn default() -> Self {
2158 Self {
2159 max_path: Arc::from(Path::new("")),
2160 count: 0,
2161 visible_count: 0,
2162 file_count: 0,
2163 visible_file_count: 0,
2164 }
2165 }
2166}
2167
2168impl sum_tree::Summary for EntrySummary {
2169 type Context = ();
2170
2171 fn add_summary(&mut self, rhs: &Self, _: &()) {
2172 self.max_path = rhs.max_path.clone();
2173 self.visible_count += rhs.visible_count;
2174 self.file_count += rhs.file_count;
2175 self.visible_file_count += rhs.visible_file_count;
2176 }
2177}
2178
2179#[derive(Clone, Debug)]
2180struct PathEntry {
2181 id: usize,
2182 path: Arc<Path>,
2183 is_ignored: bool,
2184 scan_id: usize,
2185}
2186
2187impl sum_tree::Item for PathEntry {
2188 type Summary = PathEntrySummary;
2189
2190 fn summary(&self) -> Self::Summary {
2191 PathEntrySummary { max_id: self.id }
2192 }
2193}
2194
2195impl sum_tree::KeyedItem for PathEntry {
2196 type Key = usize;
2197
2198 fn key(&self) -> Self::Key {
2199 self.id
2200 }
2201}
2202
2203#[derive(Clone, Debug, Default)]
2204struct PathEntrySummary {
2205 max_id: usize,
2206}
2207
2208impl sum_tree::Summary for PathEntrySummary {
2209 type Context = ();
2210
2211 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2212 self.max_id = summary.max_id;
2213 }
2214}
2215
2216impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2217 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2218 *self = summary.max_id;
2219 }
2220}
2221
2222#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2223pub struct PathKey(Arc<Path>);
2224
2225impl Default for PathKey {
2226 fn default() -> Self {
2227 Self(Path::new("").into())
2228 }
2229}
2230
2231impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2232 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2233 self.0 = summary.max_path.clone();
2234 }
2235}
2236
2237struct BackgroundScanner {
2238 fs: Arc<dyn Fs>,
2239 snapshot: Arc<Mutex<Snapshot>>,
2240 notify: Sender<ScanState>,
2241 executor: Arc<executor::Background>,
2242}
2243
2244impl BackgroundScanner {
2245 fn new(
2246 snapshot: Arc<Mutex<Snapshot>>,
2247 notify: Sender<ScanState>,
2248 fs: Arc<dyn Fs>,
2249 executor: Arc<executor::Background>,
2250 ) -> Self {
2251 Self {
2252 fs,
2253 snapshot,
2254 notify,
2255 executor,
2256 }
2257 }
2258
2259 fn abs_path(&self) -> Arc<Path> {
2260 self.snapshot.lock().abs_path.clone()
2261 }
2262
2263 fn snapshot(&self) -> Snapshot {
2264 self.snapshot.lock().clone()
2265 }
2266
2267 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2268 if self.notify.send(ScanState::Scanning).await.is_err() {
2269 return;
2270 }
2271
2272 if let Err(err) = self.scan_dirs().await {
2273 if self
2274 .notify
2275 .send(ScanState::Err(Arc::new(err)))
2276 .await
2277 .is_err()
2278 {
2279 return;
2280 }
2281 }
2282
2283 if self.notify.send(ScanState::Idle).await.is_err() {
2284 return;
2285 }
2286
2287 futures::pin_mut!(events_rx);
2288 while let Some(events) = events_rx.next().await {
2289 if self.notify.send(ScanState::Scanning).await.is_err() {
2290 break;
2291 }
2292
2293 if !self.process_events(events).await {
2294 break;
2295 }
2296
2297 if self.notify.send(ScanState::Idle).await.is_err() {
2298 break;
2299 }
2300 }
2301 }
2302
2303 async fn scan_dirs(&mut self) -> Result<()> {
2304 let root_char_bag;
2305 let next_entry_id;
2306 let is_dir;
2307 {
2308 let snapshot = self.snapshot.lock();
2309 root_char_bag = snapshot.root_char_bag;
2310 next_entry_id = snapshot.next_entry_id.clone();
2311 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2312 };
2313
2314 if is_dir {
2315 let path: Arc<Path> = Arc::from(Path::new(""));
2316 let abs_path = self.abs_path();
2317 let (tx, rx) = channel::unbounded();
2318 tx.send(ScanJob {
2319 abs_path: abs_path.to_path_buf(),
2320 path,
2321 ignore_stack: IgnoreStack::none(),
2322 scan_queue: tx.clone(),
2323 })
2324 .await
2325 .unwrap();
2326 drop(tx);
2327
2328 self.executor
2329 .scoped(|scope| {
2330 for _ in 0..self.executor.num_cpus() {
2331 scope.spawn(async {
2332 while let Ok(job) = rx.recv().await {
2333 if let Err(err) = self
2334 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2335 .await
2336 {
2337 log::error!("error scanning {:?}: {}", job.abs_path, err);
2338 }
2339 }
2340 });
2341 }
2342 })
2343 .await;
2344 }
2345
2346 Ok(())
2347 }
2348
2349 async fn scan_dir(
2350 &self,
2351 root_char_bag: CharBag,
2352 next_entry_id: Arc<AtomicUsize>,
2353 job: &ScanJob,
2354 ) -> Result<()> {
2355 let mut new_entries: Vec<Entry> = Vec::new();
2356 let mut new_jobs: Vec<ScanJob> = Vec::new();
2357 let mut ignore_stack = job.ignore_stack.clone();
2358 let mut new_ignore = None;
2359
2360 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2361 while let Some(child_abs_path) = child_paths.next().await {
2362 let child_abs_path = match child_abs_path {
2363 Ok(child_abs_path) => child_abs_path,
2364 Err(error) => {
2365 log::error!("error processing entry {:?}", error);
2366 continue;
2367 }
2368 };
2369 let child_name = child_abs_path.file_name().unwrap();
2370 let child_path: Arc<Path> = job.path.join(child_name).into();
2371 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2372 Some(metadata) => metadata,
2373 None => continue,
2374 };
2375
2376 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2377 if child_name == *GITIGNORE {
2378 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2379 Ok(ignore) => {
2380 let ignore = Arc::new(ignore);
2381 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2382 new_ignore = Some(ignore);
2383 }
2384 Err(error) => {
2385 log::error!(
2386 "error loading .gitignore file {:?} - {:?}",
2387 child_name,
2388 error
2389 );
2390 }
2391 }
2392
2393 // Update ignore status of any child entries we've already processed to reflect the
2394 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2395 // there should rarely be too numerous. Update the ignore stack associated with any
2396 // new jobs as well.
2397 let mut new_jobs = new_jobs.iter_mut();
2398 for entry in &mut new_entries {
2399 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2400 if entry.is_dir() {
2401 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2402 IgnoreStack::all()
2403 } else {
2404 ignore_stack.clone()
2405 };
2406 }
2407 }
2408 }
2409
2410 let mut child_entry = Entry::new(
2411 child_path.clone(),
2412 &child_metadata,
2413 &next_entry_id,
2414 root_char_bag,
2415 );
2416
2417 if child_metadata.is_dir {
2418 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2419 child_entry.is_ignored = is_ignored;
2420 new_entries.push(child_entry);
2421 new_jobs.push(ScanJob {
2422 abs_path: child_abs_path,
2423 path: child_path,
2424 ignore_stack: if is_ignored {
2425 IgnoreStack::all()
2426 } else {
2427 ignore_stack.clone()
2428 },
2429 scan_queue: job.scan_queue.clone(),
2430 });
2431 } else {
2432 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2433 new_entries.push(child_entry);
2434 };
2435 }
2436
2437 self.snapshot
2438 .lock()
2439 .populate_dir(job.path.clone(), new_entries, new_ignore);
2440 for new_job in new_jobs {
2441 job.scan_queue.send(new_job).await.unwrap();
2442 }
2443
2444 Ok(())
2445 }
2446
2447 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2448 let mut snapshot = self.snapshot();
2449 snapshot.scan_id += 1;
2450
2451 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2452 abs_path
2453 } else {
2454 return false;
2455 };
2456 let root_char_bag = snapshot.root_char_bag;
2457 let next_entry_id = snapshot.next_entry_id.clone();
2458
2459 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2460 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2461
2462 for event in &events {
2463 match event.path.strip_prefix(&root_abs_path) {
2464 Ok(path) => snapshot.remove_path(&path),
2465 Err(_) => {
2466 log::error!(
2467 "unexpected event {:?} for root path {:?}",
2468 event.path,
2469 root_abs_path
2470 );
2471 continue;
2472 }
2473 }
2474 }
2475
2476 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2477 for event in events {
2478 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2479 Ok(path) => Arc::from(path.to_path_buf()),
2480 Err(_) => {
2481 log::error!(
2482 "unexpected event {:?} for root path {:?}",
2483 event.path,
2484 root_abs_path
2485 );
2486 continue;
2487 }
2488 };
2489
2490 match self.fs.metadata(&event.path).await {
2491 Ok(Some(metadata)) => {
2492 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2493 let mut fs_entry = Entry::new(
2494 path.clone(),
2495 &metadata,
2496 snapshot.next_entry_id.as_ref(),
2497 snapshot.root_char_bag,
2498 );
2499 fs_entry.is_ignored = ignore_stack.is_all();
2500 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2501 if metadata.is_dir {
2502 scan_queue_tx
2503 .send(ScanJob {
2504 abs_path: event.path,
2505 path,
2506 ignore_stack,
2507 scan_queue: scan_queue_tx.clone(),
2508 })
2509 .await
2510 .unwrap();
2511 }
2512 }
2513 Ok(None) => {}
2514 Err(err) => {
2515 // TODO - create a special 'error' entry in the entries tree to mark this
2516 log::error!("error reading file on event {:?}", err);
2517 }
2518 }
2519 }
2520
2521 *self.snapshot.lock() = snapshot;
2522
2523 // Scan any directories that were created as part of this event batch.
2524 drop(scan_queue_tx);
2525 self.executor
2526 .scoped(|scope| {
2527 for _ in 0..self.executor.num_cpus() {
2528 scope.spawn(async {
2529 while let Ok(job) = scan_queue_rx.recv().await {
2530 if let Err(err) = self
2531 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2532 .await
2533 {
2534 log::error!("error scanning {:?}: {}", job.abs_path, err);
2535 }
2536 }
2537 });
2538 }
2539 })
2540 .await;
2541
2542 // Attempt to detect renames only over a single batch of file-system events.
2543 self.snapshot.lock().removed_entry_ids.clear();
2544
2545 self.update_ignore_statuses().await;
2546 true
2547 }
2548
2549 async fn update_ignore_statuses(&self) {
2550 let mut snapshot = self.snapshot();
2551
2552 let mut ignores_to_update = Vec::new();
2553 let mut ignores_to_delete = Vec::new();
2554 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2555 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2556 ignores_to_update.push(parent_path.clone());
2557 }
2558
2559 let ignore_path = parent_path.join(&*GITIGNORE);
2560 if snapshot.entry_for_path(ignore_path).is_none() {
2561 ignores_to_delete.push(parent_path.clone());
2562 }
2563 }
2564
2565 for parent_path in ignores_to_delete {
2566 snapshot.ignores.remove(&parent_path);
2567 self.snapshot.lock().ignores.remove(&parent_path);
2568 }
2569
2570 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2571 ignores_to_update.sort_unstable();
2572 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2573 while let Some(parent_path) = ignores_to_update.next() {
2574 while ignores_to_update
2575 .peek()
2576 .map_or(false, |p| p.starts_with(&parent_path))
2577 {
2578 ignores_to_update.next().unwrap();
2579 }
2580
2581 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2582 ignore_queue_tx
2583 .send(UpdateIgnoreStatusJob {
2584 path: parent_path,
2585 ignore_stack,
2586 ignore_queue: ignore_queue_tx.clone(),
2587 })
2588 .await
2589 .unwrap();
2590 }
2591 drop(ignore_queue_tx);
2592
2593 self.executor
2594 .scoped(|scope| {
2595 for _ in 0..self.executor.num_cpus() {
2596 scope.spawn(async {
2597 while let Ok(job) = ignore_queue_rx.recv().await {
2598 self.update_ignore_status(job, &snapshot).await;
2599 }
2600 });
2601 }
2602 })
2603 .await;
2604 }
2605
2606 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2607 let mut ignore_stack = job.ignore_stack;
2608 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2609 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2610 }
2611
2612 let mut entries_by_id_edits = Vec::new();
2613 let mut entries_by_path_edits = Vec::new();
2614 for mut entry in snapshot.child_entries(&job.path).cloned() {
2615 let was_ignored = entry.is_ignored;
2616 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2617 if entry.is_dir() {
2618 let child_ignore_stack = if entry.is_ignored {
2619 IgnoreStack::all()
2620 } else {
2621 ignore_stack.clone()
2622 };
2623 job.ignore_queue
2624 .send(UpdateIgnoreStatusJob {
2625 path: entry.path.clone(),
2626 ignore_stack: child_ignore_stack,
2627 ignore_queue: job.ignore_queue.clone(),
2628 })
2629 .await
2630 .unwrap();
2631 }
2632
2633 if entry.is_ignored != was_ignored {
2634 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2635 path_entry.scan_id = snapshot.scan_id;
2636 path_entry.is_ignored = entry.is_ignored;
2637 entries_by_id_edits.push(Edit::Insert(path_entry));
2638 entries_by_path_edits.push(Edit::Insert(entry));
2639 }
2640 }
2641
2642 let mut snapshot = self.snapshot.lock();
2643 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2644 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2645 }
2646}
2647
2648async fn refresh_entry(
2649 fs: &dyn Fs,
2650 snapshot: &Mutex<Snapshot>,
2651 path: Arc<Path>,
2652 abs_path: &Path,
2653) -> Result<Entry> {
2654 let root_char_bag;
2655 let next_entry_id;
2656 {
2657 let snapshot = snapshot.lock();
2658 root_char_bag = snapshot.root_char_bag;
2659 next_entry_id = snapshot.next_entry_id.clone();
2660 }
2661 let entry = Entry::new(
2662 path,
2663 &fs.metadata(abs_path)
2664 .await?
2665 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2666 &next_entry_id,
2667 root_char_bag,
2668 );
2669 Ok(snapshot.lock().insert_entry(entry, fs))
2670}
2671
2672fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2673 let mut result = root_char_bag;
2674 result.extend(
2675 path.to_string_lossy()
2676 .chars()
2677 .map(|c| c.to_ascii_lowercase()),
2678 );
2679 result
2680}
2681
2682struct ScanJob {
2683 abs_path: PathBuf,
2684 path: Arc<Path>,
2685 ignore_stack: Arc<IgnoreStack>,
2686 scan_queue: Sender<ScanJob>,
2687}
2688
2689struct UpdateIgnoreStatusJob {
2690 path: Arc<Path>,
2691 ignore_stack: Arc<IgnoreStack>,
2692 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2693}
2694
2695pub trait WorktreeHandle {
2696 #[cfg(test)]
2697 fn flush_fs_events<'a>(
2698 &self,
2699 cx: &'a gpui::TestAppContext,
2700 ) -> futures::future::LocalBoxFuture<'a, ()>;
2701}
2702
2703impl WorktreeHandle for ModelHandle<Worktree> {
2704 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2705 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2706 // extra directory scans, and emit extra scan-state notifications.
2707 //
2708 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2709 // to ensure that all redundant FS events have already been processed.
2710 #[cfg(test)]
2711 fn flush_fs_events<'a>(
2712 &self,
2713 cx: &'a gpui::TestAppContext,
2714 ) -> futures::future::LocalBoxFuture<'a, ()> {
2715 use smol::future::FutureExt;
2716
2717 let filename = "fs-event-sentinel";
2718 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2719 let tree = self.clone();
2720 async move {
2721 std::fs::write(root_path.join(filename), "").unwrap();
2722 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2723 .await;
2724
2725 std::fs::remove_file(root_path.join(filename)).unwrap();
2726 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2727 .await;
2728
2729 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2730 .await;
2731 }
2732 .boxed_local()
2733 }
2734}
2735
2736#[derive(Clone, Debug)]
2737struct TraversalProgress<'a> {
2738 max_path: &'a Path,
2739 count: usize,
2740 visible_count: usize,
2741 file_count: usize,
2742 visible_file_count: usize,
2743}
2744
2745impl<'a> TraversalProgress<'a> {
2746 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2747 match (include_ignored, include_dirs) {
2748 (true, true) => self.count,
2749 (true, false) => self.file_count,
2750 (false, true) => self.visible_count,
2751 (false, false) => self.visible_file_count,
2752 }
2753 }
2754}
2755
2756impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2757 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2758 self.max_path = summary.max_path.as_ref();
2759 self.count += summary.count;
2760 self.visible_count += summary.visible_count;
2761 self.file_count += summary.file_count;
2762 self.visible_file_count += summary.visible_file_count;
2763 }
2764}
2765
2766impl<'a> Default for TraversalProgress<'a> {
2767 fn default() -> Self {
2768 Self {
2769 max_path: Path::new(""),
2770 count: 0,
2771 visible_count: 0,
2772 file_count: 0,
2773 visible_file_count: 0,
2774 }
2775 }
2776}
2777
2778pub struct Traversal<'a> {
2779 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2780 include_ignored: bool,
2781 include_dirs: bool,
2782}
2783
2784impl<'a> Traversal<'a> {
2785 pub fn advance(&mut self) -> bool {
2786 self.advance_to_offset(self.offset() + 1)
2787 }
2788
2789 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2790 self.cursor.seek_forward(
2791 &TraversalTarget::Count {
2792 count: offset,
2793 include_dirs: self.include_dirs,
2794 include_ignored: self.include_ignored,
2795 },
2796 Bias::Right,
2797 &(),
2798 )
2799 }
2800
2801 pub fn advance_to_sibling(&mut self) -> bool {
2802 while let Some(entry) = self.cursor.item() {
2803 self.cursor.seek_forward(
2804 &TraversalTarget::PathSuccessor(&entry.path),
2805 Bias::Left,
2806 &(),
2807 );
2808 if let Some(entry) = self.cursor.item() {
2809 if (self.include_dirs || !entry.is_dir())
2810 && (self.include_ignored || !entry.is_ignored)
2811 {
2812 return true;
2813 }
2814 }
2815 }
2816 false
2817 }
2818
2819 pub fn entry(&self) -> Option<&'a Entry> {
2820 self.cursor.item()
2821 }
2822
2823 pub fn offset(&self) -> usize {
2824 self.cursor
2825 .start()
2826 .count(self.include_dirs, self.include_ignored)
2827 }
2828}
2829
2830impl<'a> Iterator for Traversal<'a> {
2831 type Item = &'a Entry;
2832
2833 fn next(&mut self) -> Option<Self::Item> {
2834 if let Some(item) = self.entry() {
2835 self.advance();
2836 Some(item)
2837 } else {
2838 None
2839 }
2840 }
2841}
2842
2843#[derive(Debug)]
2844enum TraversalTarget<'a> {
2845 Path(&'a Path),
2846 PathSuccessor(&'a Path),
2847 Count {
2848 count: usize,
2849 include_ignored: bool,
2850 include_dirs: bool,
2851 },
2852}
2853
2854impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
2855 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
2856 match self {
2857 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
2858 TraversalTarget::PathSuccessor(path) => {
2859 if !cursor_location.max_path.starts_with(path) {
2860 Ordering::Equal
2861 } else {
2862 Ordering::Greater
2863 }
2864 }
2865 TraversalTarget::Count {
2866 count,
2867 include_dirs,
2868 include_ignored,
2869 } => Ord::cmp(
2870 count,
2871 &cursor_location.count(*include_dirs, *include_ignored),
2872 ),
2873 }
2874 }
2875}
2876
2877struct ChildEntriesIter<'a> {
2878 parent_path: &'a Path,
2879 traversal: Traversal<'a>,
2880}
2881
2882impl<'a> Iterator for ChildEntriesIter<'a> {
2883 type Item = &'a Entry;
2884
2885 fn next(&mut self) -> Option<Self::Item> {
2886 if let Some(item) = self.traversal.entry() {
2887 if item.path.starts_with(&self.parent_path) {
2888 self.traversal.advance_to_sibling();
2889 return Some(item);
2890 }
2891 }
2892 None
2893 }
2894}
2895
2896impl<'a> From<&'a Entry> for proto::Entry {
2897 fn from(entry: &'a Entry) -> Self {
2898 Self {
2899 id: entry.id as u64,
2900 is_dir: entry.is_dir(),
2901 path: entry.path.to_string_lossy().to_string(),
2902 inode: entry.inode,
2903 mtime: Some(entry.mtime.into()),
2904 is_symlink: entry.is_symlink,
2905 is_ignored: entry.is_ignored,
2906 }
2907 }
2908}
2909
2910impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2911 type Error = anyhow::Error;
2912
2913 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2914 if let Some(mtime) = entry.mtime {
2915 let kind = if entry.is_dir {
2916 EntryKind::Dir
2917 } else {
2918 let mut char_bag = root_char_bag.clone();
2919 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2920 EntryKind::File(char_bag)
2921 };
2922 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2923 Ok(Entry {
2924 id: entry.id as usize,
2925 kind,
2926 path: path.clone(),
2927 inode: entry.inode,
2928 mtime: mtime.into(),
2929 is_symlink: entry.is_symlink,
2930 is_ignored: entry.is_ignored,
2931 })
2932 } else {
2933 Err(anyhow!(
2934 "missing mtime in remote worktree entry {:?}",
2935 entry.path
2936 ))
2937 }
2938 }
2939}
2940
2941#[cfg(test)]
2942mod tests {
2943 use super::*;
2944 use crate::fs::FakeFs;
2945 use anyhow::Result;
2946 use buffer::Point;
2947 use client::test::FakeServer;
2948 use fs::RealFs;
2949 use language::{tree_sitter_rust, LanguageServerConfig};
2950 use language::{Diagnostic, LanguageConfig};
2951 use lsp::Url;
2952 use rand::prelude::*;
2953 use serde_json::json;
2954 use std::{cell::RefCell, rc::Rc};
2955 use std::{
2956 env,
2957 fmt::Write,
2958 time::{SystemTime, UNIX_EPOCH},
2959 };
2960 use util::test::temp_tree;
2961
2962 #[gpui::test]
2963 async fn test_traversal(cx: gpui::TestAppContext) {
2964 let fs = FakeFs::new();
2965 fs.insert_tree(
2966 "/root",
2967 json!({
2968 ".gitignore": "a/b\n",
2969 "a": {
2970 "b": "",
2971 "c": "",
2972 }
2973 }),
2974 )
2975 .await;
2976
2977 let tree = Worktree::open_local(
2978 Client::new(),
2979 Arc::from(Path::new("/root")),
2980 Arc::new(fs),
2981 Default::default(),
2982 &mut cx.to_async(),
2983 )
2984 .await
2985 .unwrap();
2986 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2987 .await;
2988
2989 tree.read_with(&cx, |tree, _| {
2990 assert_eq!(
2991 tree.entries(false)
2992 .map(|entry| entry.path.as_ref())
2993 .collect::<Vec<_>>(),
2994 vec![
2995 Path::new(""),
2996 Path::new(".gitignore"),
2997 Path::new("a"),
2998 Path::new("a/c"),
2999 ]
3000 );
3001 })
3002 }
3003
3004 #[gpui::test]
3005 async fn test_save_file(mut cx: gpui::TestAppContext) {
3006 let dir = temp_tree(json!({
3007 "file1": "the old contents",
3008 }));
3009 let tree = Worktree::open_local(
3010 Client::new(),
3011 dir.path(),
3012 Arc::new(RealFs),
3013 Default::default(),
3014 &mut cx.to_async(),
3015 )
3016 .await
3017 .unwrap();
3018 let buffer = tree
3019 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3020 .await
3021 .unwrap();
3022 let save = buffer.update(&mut cx, |buffer, cx| {
3023 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3024 buffer.save(cx).unwrap()
3025 });
3026 save.await.unwrap();
3027
3028 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3029 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3030 }
3031
3032 #[gpui::test]
3033 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3034 let dir = temp_tree(json!({
3035 "file1": "the old contents",
3036 }));
3037 let file_path = dir.path().join("file1");
3038
3039 let tree = Worktree::open_local(
3040 Client::new(),
3041 file_path.clone(),
3042 Arc::new(RealFs),
3043 Default::default(),
3044 &mut cx.to_async(),
3045 )
3046 .await
3047 .unwrap();
3048 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3049 .await;
3050 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3051
3052 let buffer = tree
3053 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3054 .await
3055 .unwrap();
3056 let save = buffer.update(&mut cx, |buffer, cx| {
3057 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3058 buffer.save(cx).unwrap()
3059 });
3060 save.await.unwrap();
3061
3062 let new_text = std::fs::read_to_string(file_path).unwrap();
3063 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3064 }
3065
3066 #[gpui::test]
3067 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3068 let dir = temp_tree(json!({
3069 "a": {
3070 "file1": "",
3071 "file2": "",
3072 "file3": "",
3073 },
3074 "b": {
3075 "c": {
3076 "file4": "",
3077 "file5": "",
3078 }
3079 }
3080 }));
3081
3082 let user_id = 5;
3083 let mut client = Client::new();
3084 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3085 let tree = Worktree::open_local(
3086 client,
3087 dir.path(),
3088 Arc::new(RealFs),
3089 Default::default(),
3090 &mut cx.to_async(),
3091 )
3092 .await
3093 .unwrap();
3094
3095 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3096 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3097 async move { buffer.await.unwrap() }
3098 };
3099 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3100 tree.read_with(cx, |tree, _| {
3101 tree.entry_for_path(path)
3102 .expect(&format!("no entry for path {}", path))
3103 .id
3104 })
3105 };
3106
3107 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3108 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3109 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3110 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3111
3112 let file2_id = id_for_path("a/file2", &cx);
3113 let file3_id = id_for_path("a/file3", &cx);
3114 let file4_id = id_for_path("b/c/file4", &cx);
3115
3116 // Wait for the initial scan.
3117 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3118 .await;
3119
3120 // Create a remote copy of this worktree.
3121 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3122 let worktree_id = 1;
3123 let share_request = tree.update(&mut cx, |tree, cx| {
3124 tree.as_local().unwrap().share_request(cx)
3125 });
3126 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3127 server
3128 .respond(
3129 open_worktree.receipt(),
3130 proto::OpenWorktreeResponse { worktree_id: 1 },
3131 )
3132 .await;
3133
3134 let remote = Worktree::remote(
3135 proto::JoinWorktreeResponse {
3136 worktree: share_request.await.unwrap().worktree,
3137 replica_id: 1,
3138 peers: Vec::new(),
3139 },
3140 Client::new(),
3141 Default::default(),
3142 &mut cx.to_async(),
3143 )
3144 .await
3145 .unwrap();
3146
3147 cx.read(|cx| {
3148 assert!(!buffer2.read(cx).is_dirty());
3149 assert!(!buffer3.read(cx).is_dirty());
3150 assert!(!buffer4.read(cx).is_dirty());
3151 assert!(!buffer5.read(cx).is_dirty());
3152 });
3153
3154 // Rename and delete files and directories.
3155 tree.flush_fs_events(&cx).await;
3156 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3157 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3158 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3159 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3160 tree.flush_fs_events(&cx).await;
3161
3162 let expected_paths = vec![
3163 "a",
3164 "a/file1",
3165 "a/file2.new",
3166 "b",
3167 "d",
3168 "d/file3",
3169 "d/file4",
3170 ];
3171
3172 cx.read(|app| {
3173 assert_eq!(
3174 tree.read(app)
3175 .paths()
3176 .map(|p| p.to_str().unwrap())
3177 .collect::<Vec<_>>(),
3178 expected_paths
3179 );
3180
3181 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3182 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3183 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3184
3185 assert_eq!(
3186 buffer2.read(app).file().unwrap().path().as_ref(),
3187 Path::new("a/file2.new")
3188 );
3189 assert_eq!(
3190 buffer3.read(app).file().unwrap().path().as_ref(),
3191 Path::new("d/file3")
3192 );
3193 assert_eq!(
3194 buffer4.read(app).file().unwrap().path().as_ref(),
3195 Path::new("d/file4")
3196 );
3197 assert_eq!(
3198 buffer5.read(app).file().unwrap().path().as_ref(),
3199 Path::new("b/c/file5")
3200 );
3201
3202 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3203 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3204 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3205 assert!(buffer5.read(app).file().unwrap().is_deleted());
3206 });
3207
3208 // Update the remote worktree. Check that it becomes consistent with the
3209 // local worktree.
3210 remote.update(&mut cx, |remote, cx| {
3211 let update_message =
3212 tree.read(cx)
3213 .snapshot()
3214 .build_update(&initial_snapshot, worktree_id, true);
3215 remote
3216 .as_remote_mut()
3217 .unwrap()
3218 .snapshot
3219 .apply_update(update_message)
3220 .unwrap();
3221
3222 assert_eq!(
3223 remote
3224 .paths()
3225 .map(|p| p.to_str().unwrap())
3226 .collect::<Vec<_>>(),
3227 expected_paths
3228 );
3229 });
3230 }
3231
3232 #[gpui::test]
3233 async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
3234 let dir = temp_tree(json!({
3235 ".git": {},
3236 ".gitignore": "ignored-dir\n",
3237 "tracked-dir": {
3238 "tracked-file1": "tracked contents",
3239 },
3240 "ignored-dir": {
3241 "ignored-file1": "ignored contents",
3242 }
3243 }));
3244
3245 let tree = Worktree::open_local(
3246 Client::new(),
3247 dir.path(),
3248 Arc::new(RealFs),
3249 Default::default(),
3250 &mut cx.to_async(),
3251 )
3252 .await
3253 .unwrap();
3254 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3255 .await;
3256 tree.flush_fs_events(&cx).await;
3257 cx.read(|cx| {
3258 let tree = tree.read(cx);
3259 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3260 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3261 assert_eq!(tracked.is_ignored, false);
3262 assert_eq!(ignored.is_ignored, true);
3263 });
3264
3265 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3266 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3267 tree.flush_fs_events(&cx).await;
3268 cx.read(|cx| {
3269 let tree = tree.read(cx);
3270 let dot_git = tree.entry_for_path(".git").unwrap();
3271 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3272 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3273 assert_eq!(tracked.is_ignored, false);
3274 assert_eq!(ignored.is_ignored, true);
3275 assert_eq!(dot_git.is_ignored, true);
3276 });
3277 }
3278
3279 #[gpui::test]
3280 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3281 let user_id = 100;
3282 let mut client = Client::new();
3283 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3284
3285 let fs = Arc::new(FakeFs::new());
3286 fs.insert_tree(
3287 "/path",
3288 json!({
3289 "to": {
3290 "the-dir": {
3291 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3292 "a.txt": "a-contents",
3293 },
3294 },
3295 }),
3296 )
3297 .await;
3298
3299 let worktree = Worktree::open_local(
3300 client.clone(),
3301 "/path/to/the-dir".as_ref(),
3302 fs,
3303 Default::default(),
3304 &mut cx.to_async(),
3305 )
3306 .await
3307 .unwrap();
3308
3309 {
3310 let cx = cx.to_async();
3311 client.authenticate_and_connect(&cx).await.unwrap();
3312 }
3313
3314 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3315 assert_eq!(
3316 open_worktree.payload,
3317 proto::OpenWorktree {
3318 root_name: "the-dir".to_string(),
3319 collaborator_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3320 }
3321 );
3322
3323 server
3324 .respond(
3325 open_worktree.receipt(),
3326 proto::OpenWorktreeResponse { worktree_id: 5 },
3327 )
3328 .await;
3329 let remote_id = worktree
3330 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3331 .await;
3332 assert_eq!(remote_id, Some(5));
3333
3334 cx.update(move |_| drop(worktree));
3335 server.receive::<proto::CloseWorktree>().await.unwrap();
3336 }
3337
3338 #[gpui::test]
3339 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3340 use std::fs;
3341
3342 let dir = temp_tree(json!({
3343 "file1": "abc",
3344 "file2": "def",
3345 "file3": "ghi",
3346 }));
3347 let tree = Worktree::open_local(
3348 Client::new(),
3349 dir.path(),
3350 Arc::new(RealFs),
3351 Default::default(),
3352 &mut cx.to_async(),
3353 )
3354 .await
3355 .unwrap();
3356 tree.flush_fs_events(&cx).await;
3357 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3358 .await;
3359
3360 let buffer1 = tree
3361 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3362 .await
3363 .unwrap();
3364 let events = Rc::new(RefCell::new(Vec::new()));
3365
3366 // initially, the buffer isn't dirty.
3367 buffer1.update(&mut cx, |buffer, cx| {
3368 cx.subscribe(&buffer1, {
3369 let events = events.clone();
3370 move |_, _, event, _| events.borrow_mut().push(event.clone())
3371 })
3372 .detach();
3373
3374 assert!(!buffer.is_dirty());
3375 assert!(events.borrow().is_empty());
3376
3377 buffer.edit(vec![1..2], "", cx);
3378 });
3379
3380 // after the first edit, the buffer is dirty, and emits a dirtied event.
3381 buffer1.update(&mut cx, |buffer, cx| {
3382 assert!(buffer.text() == "ac");
3383 assert!(buffer.is_dirty());
3384 assert_eq!(
3385 *events.borrow(),
3386 &[language::Event::Edited, language::Event::Dirtied]
3387 );
3388 events.borrow_mut().clear();
3389 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3390 });
3391
3392 // after saving, the buffer is not dirty, and emits a saved event.
3393 buffer1.update(&mut cx, |buffer, cx| {
3394 assert!(!buffer.is_dirty());
3395 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3396 events.borrow_mut().clear();
3397
3398 buffer.edit(vec![1..1], "B", cx);
3399 buffer.edit(vec![2..2], "D", cx);
3400 });
3401
3402 // after editing again, the buffer is dirty, and emits another dirty event.
3403 buffer1.update(&mut cx, |buffer, cx| {
3404 assert!(buffer.text() == "aBDc");
3405 assert!(buffer.is_dirty());
3406 assert_eq!(
3407 *events.borrow(),
3408 &[
3409 language::Event::Edited,
3410 language::Event::Dirtied,
3411 language::Event::Edited
3412 ],
3413 );
3414 events.borrow_mut().clear();
3415
3416 // TODO - currently, after restoring the buffer to its
3417 // previously-saved state, the is still considered dirty.
3418 buffer.edit(vec![1..3], "", cx);
3419 assert!(buffer.text() == "ac");
3420 assert!(buffer.is_dirty());
3421 });
3422
3423 assert_eq!(*events.borrow(), &[language::Event::Edited]);
3424
3425 // When a file is deleted, the buffer is considered dirty.
3426 let events = Rc::new(RefCell::new(Vec::new()));
3427 let buffer2 = tree
3428 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3429 .await
3430 .unwrap();
3431 buffer2.update(&mut cx, |_, cx| {
3432 cx.subscribe(&buffer2, {
3433 let events = events.clone();
3434 move |_, _, event, _| events.borrow_mut().push(event.clone())
3435 })
3436 .detach();
3437 });
3438
3439 fs::remove_file(dir.path().join("file2")).unwrap();
3440 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3441 assert_eq!(
3442 *events.borrow(),
3443 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3444 );
3445
3446 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3447 let events = Rc::new(RefCell::new(Vec::new()));
3448 let buffer3 = tree
3449 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3450 .await
3451 .unwrap();
3452 buffer3.update(&mut cx, |_, cx| {
3453 cx.subscribe(&buffer3, {
3454 let events = events.clone();
3455 move |_, _, event, _| events.borrow_mut().push(event.clone())
3456 })
3457 .detach();
3458 });
3459
3460 tree.flush_fs_events(&cx).await;
3461 buffer3.update(&mut cx, |buffer, cx| {
3462 buffer.edit(Some(0..0), "x", cx);
3463 });
3464 events.borrow_mut().clear();
3465 fs::remove_file(dir.path().join("file3")).unwrap();
3466 buffer3
3467 .condition(&cx, |_, _| !events.borrow().is_empty())
3468 .await;
3469 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3470 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3471 }
3472
3473 #[gpui::test]
3474 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3475 use buffer::{Point, Selection, SelectionGoal};
3476 use std::fs;
3477
3478 let initial_contents = "aaa\nbbbbb\nc\n";
3479 let dir = temp_tree(json!({ "the-file": initial_contents }));
3480 let tree = Worktree::open_local(
3481 Client::new(),
3482 dir.path(),
3483 Arc::new(RealFs),
3484 Default::default(),
3485 &mut cx.to_async(),
3486 )
3487 .await
3488 .unwrap();
3489 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3490 .await;
3491
3492 let abs_path = dir.path().join("the-file");
3493 let buffer = tree
3494 .update(&mut cx, |tree, cx| {
3495 tree.open_buffer(Path::new("the-file"), cx)
3496 })
3497 .await
3498 .unwrap();
3499
3500 // Add a cursor on each row.
3501 let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3502 assert!(!buffer.is_dirty());
3503 buffer.add_selection_set(
3504 &(0..3)
3505 .map(|row| Selection {
3506 id: row as usize,
3507 start: Point::new(row, 1),
3508 end: Point::new(row, 1),
3509 reversed: false,
3510 goal: SelectionGoal::None,
3511 })
3512 .collect::<Vec<_>>(),
3513 cx,
3514 )
3515 });
3516
3517 // Change the file on disk, adding two new lines of text, and removing
3518 // one line.
3519 buffer.read_with(&cx, |buffer, _| {
3520 assert!(!buffer.is_dirty());
3521 assert!(!buffer.has_conflict());
3522 });
3523 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3524 fs::write(&abs_path, new_contents).unwrap();
3525
3526 // Because the buffer was not modified, it is reloaded from disk. Its
3527 // contents are edited according to the diff between the old and new
3528 // file contents.
3529 buffer
3530 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3531 .await;
3532
3533 buffer.update(&mut cx, |buffer, _| {
3534 assert_eq!(buffer.text(), new_contents);
3535 assert!(!buffer.is_dirty());
3536 assert!(!buffer.has_conflict());
3537
3538 let cursor_positions = buffer
3539 .selection_set(selection_set_id)
3540 .unwrap()
3541 .point_selections(&*buffer)
3542 .map(|selection| {
3543 assert_eq!(selection.start, selection.end);
3544 selection.start
3545 })
3546 .collect::<Vec<_>>();
3547 assert_eq!(
3548 cursor_positions,
3549 [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3550 );
3551 });
3552
3553 // Modify the buffer
3554 buffer.update(&mut cx, |buffer, cx| {
3555 buffer.edit(vec![0..0], " ", cx);
3556 assert!(buffer.is_dirty());
3557 assert!(!buffer.has_conflict());
3558 });
3559
3560 // Change the file on disk again, adding blank lines to the beginning.
3561 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3562
3563 // Because the buffer is modified, it doesn't reload from disk, but is
3564 // marked as having a conflict.
3565 buffer
3566 .condition(&cx, |buffer, _| buffer.has_conflict())
3567 .await;
3568 }
3569
3570 #[gpui::test]
3571 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3572 simplelog::SimpleLogger::init(log::LevelFilter::Info, Default::default()).unwrap();
3573
3574 let (language_server_config, mut fake_server) =
3575 LanguageServerConfig::fake(cx.background()).await;
3576 let mut languages = LanguageRegistry::new();
3577 languages.add(Arc::new(Language::new(
3578 LanguageConfig {
3579 name: "Rust".to_string(),
3580 path_suffixes: vec!["rs".to_string()],
3581 language_server: Some(language_server_config),
3582 ..Default::default()
3583 },
3584 tree_sitter_rust::language(),
3585 )));
3586
3587 let dir = temp_tree(json!({
3588 "a.rs": "fn a() { A }",
3589 "b.rs": "const y: i32 = 1",
3590 }));
3591
3592 let tree = Worktree::open_local(
3593 Client::new(),
3594 dir.path(),
3595 Arc::new(RealFs),
3596 Arc::new(languages),
3597 &mut cx.to_async(),
3598 )
3599 .await
3600 .unwrap();
3601 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3602 .await;
3603
3604 // Cause worktree to start the fake language server
3605 let _buffer = tree
3606 .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
3607 .await
3608 .unwrap();
3609
3610 fake_server
3611 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3612 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3613 version: None,
3614 diagnostics: vec![lsp::Diagnostic {
3615 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3616 severity: Some(lsp::DiagnosticSeverity::ERROR),
3617 message: "undefined variable 'A'".to_string(),
3618 ..Default::default()
3619 }],
3620 })
3621 .await;
3622
3623 let buffer = tree
3624 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3625 .await
3626 .unwrap();
3627
3628 buffer.read_with(&cx, |buffer, _| {
3629 let diagnostics = buffer
3630 .diagnostics_in_range(0..buffer.len())
3631 .collect::<Vec<_>>();
3632 assert_eq!(
3633 diagnostics,
3634 &[(
3635 Point::new(0, 9)..Point::new(0, 10),
3636 &Diagnostic {
3637 severity: lsp::DiagnosticSeverity::ERROR,
3638 message: "undefined variable 'A'".to_string()
3639 }
3640 )]
3641 )
3642 });
3643 }
3644
3645 #[gpui::test(iterations = 100)]
3646 fn test_random(mut rng: StdRng) {
3647 let operations = env::var("OPERATIONS")
3648 .map(|o| o.parse().unwrap())
3649 .unwrap_or(40);
3650 let initial_entries = env::var("INITIAL_ENTRIES")
3651 .map(|o| o.parse().unwrap())
3652 .unwrap_or(20);
3653
3654 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3655 for _ in 0..initial_entries {
3656 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3657 }
3658 log::info!("Generated initial tree");
3659
3660 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3661 let fs = Arc::new(RealFs);
3662 let next_entry_id = Arc::new(AtomicUsize::new(0));
3663 let mut initial_snapshot = Snapshot {
3664 id: 0,
3665 scan_id: 0,
3666 abs_path: root_dir.path().into(),
3667 entries_by_path: Default::default(),
3668 entries_by_id: Default::default(),
3669 removed_entry_ids: Default::default(),
3670 ignores: Default::default(),
3671 root_name: Default::default(),
3672 root_char_bag: Default::default(),
3673 next_entry_id: next_entry_id.clone(),
3674 };
3675 initial_snapshot.insert_entry(
3676 Entry::new(
3677 Path::new("").into(),
3678 &smol::block_on(fs.metadata(root_dir.path()))
3679 .unwrap()
3680 .unwrap(),
3681 &next_entry_id,
3682 Default::default(),
3683 ),
3684 fs.as_ref(),
3685 );
3686 let mut scanner = BackgroundScanner::new(
3687 Arc::new(Mutex::new(initial_snapshot.clone())),
3688 notify_tx,
3689 fs.clone(),
3690 Arc::new(gpui::executor::Background::new()),
3691 );
3692 smol::block_on(scanner.scan_dirs()).unwrap();
3693 scanner.snapshot().check_invariants();
3694
3695 let mut events = Vec::new();
3696 let mut snapshots = Vec::new();
3697 let mut mutations_len = operations;
3698 while mutations_len > 1 {
3699 if !events.is_empty() && rng.gen_bool(0.4) {
3700 let len = rng.gen_range(0..=events.len());
3701 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3702 log::info!("Delivering events: {:#?}", to_deliver);
3703 smol::block_on(scanner.process_events(to_deliver));
3704 scanner.snapshot().check_invariants();
3705 } else {
3706 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3707 mutations_len -= 1;
3708 }
3709
3710 if rng.gen_bool(0.2) {
3711 snapshots.push(scanner.snapshot());
3712 }
3713 }
3714 log::info!("Quiescing: {:#?}", events);
3715 smol::block_on(scanner.process_events(events));
3716 scanner.snapshot().check_invariants();
3717
3718 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3719 let mut new_scanner = BackgroundScanner::new(
3720 Arc::new(Mutex::new(initial_snapshot)),
3721 notify_tx,
3722 scanner.fs.clone(),
3723 scanner.executor.clone(),
3724 );
3725 smol::block_on(new_scanner.scan_dirs()).unwrap();
3726 assert_eq!(
3727 scanner.snapshot().to_vec(true),
3728 new_scanner.snapshot().to_vec(true)
3729 );
3730
3731 for mut prev_snapshot in snapshots {
3732 let include_ignored = rng.gen::<bool>();
3733 if !include_ignored {
3734 let mut entries_by_path_edits = Vec::new();
3735 let mut entries_by_id_edits = Vec::new();
3736 for entry in prev_snapshot
3737 .entries_by_id
3738 .cursor::<()>()
3739 .filter(|e| e.is_ignored)
3740 {
3741 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
3742 entries_by_id_edits.push(Edit::Remove(entry.id));
3743 }
3744
3745 prev_snapshot
3746 .entries_by_path
3747 .edit(entries_by_path_edits, &());
3748 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
3749 }
3750
3751 let update = scanner
3752 .snapshot()
3753 .build_update(&prev_snapshot, 0, include_ignored);
3754 prev_snapshot.apply_update(update).unwrap();
3755 assert_eq!(
3756 prev_snapshot.to_vec(true),
3757 scanner.snapshot().to_vec(include_ignored)
3758 );
3759 }
3760 }
3761
3762 fn randomly_mutate_tree(
3763 root_path: &Path,
3764 insertion_probability: f64,
3765 rng: &mut impl Rng,
3766 ) -> Result<Vec<fsevent::Event>> {
3767 let root_path = root_path.canonicalize().unwrap();
3768 let (dirs, files) = read_dir_recursive(root_path.clone());
3769
3770 let mut events = Vec::new();
3771 let mut record_event = |path: PathBuf| {
3772 events.push(fsevent::Event {
3773 event_id: SystemTime::now()
3774 .duration_since(UNIX_EPOCH)
3775 .unwrap()
3776 .as_secs(),
3777 flags: fsevent::StreamFlags::empty(),
3778 path,
3779 });
3780 };
3781
3782 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3783 let path = dirs.choose(rng).unwrap();
3784 let new_path = path.join(gen_name(rng));
3785
3786 if rng.gen() {
3787 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3788 std::fs::create_dir(&new_path)?;
3789 } else {
3790 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3791 std::fs::write(&new_path, "")?;
3792 }
3793 record_event(new_path);
3794 } else if rng.gen_bool(0.05) {
3795 let ignore_dir_path = dirs.choose(rng).unwrap();
3796 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3797
3798 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3799 let files_to_ignore = {
3800 let len = rng.gen_range(0..=subfiles.len());
3801 subfiles.choose_multiple(rng, len)
3802 };
3803 let dirs_to_ignore = {
3804 let len = rng.gen_range(0..subdirs.len());
3805 subdirs.choose_multiple(rng, len)
3806 };
3807
3808 let mut ignore_contents = String::new();
3809 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3810 write!(
3811 ignore_contents,
3812 "{}\n",
3813 path_to_ignore
3814 .strip_prefix(&ignore_dir_path)?
3815 .to_str()
3816 .unwrap()
3817 )
3818 .unwrap();
3819 }
3820 log::info!(
3821 "Creating {:?} with contents:\n{}",
3822 ignore_path.strip_prefix(&root_path)?,
3823 ignore_contents
3824 );
3825 std::fs::write(&ignore_path, ignore_contents).unwrap();
3826 record_event(ignore_path);
3827 } else {
3828 let old_path = {
3829 let file_path = files.choose(rng);
3830 let dir_path = dirs[1..].choose(rng);
3831 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3832 };
3833
3834 let is_rename = rng.gen();
3835 if is_rename {
3836 let new_path_parent = dirs
3837 .iter()
3838 .filter(|d| !d.starts_with(old_path))
3839 .choose(rng)
3840 .unwrap();
3841
3842 let overwrite_existing_dir =
3843 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3844 let new_path = if overwrite_existing_dir {
3845 std::fs::remove_dir_all(&new_path_parent).ok();
3846 new_path_parent.to_path_buf()
3847 } else {
3848 new_path_parent.join(gen_name(rng))
3849 };
3850
3851 log::info!(
3852 "Renaming {:?} to {}{:?}",
3853 old_path.strip_prefix(&root_path)?,
3854 if overwrite_existing_dir {
3855 "overwrite "
3856 } else {
3857 ""
3858 },
3859 new_path.strip_prefix(&root_path)?
3860 );
3861 std::fs::rename(&old_path, &new_path)?;
3862 record_event(old_path.clone());
3863 record_event(new_path);
3864 } else if old_path.is_dir() {
3865 let (dirs, files) = read_dir_recursive(old_path.clone());
3866
3867 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3868 std::fs::remove_dir_all(&old_path).unwrap();
3869 for file in files {
3870 record_event(file);
3871 }
3872 for dir in dirs {
3873 record_event(dir);
3874 }
3875 } else {
3876 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3877 std::fs::remove_file(old_path).unwrap();
3878 record_event(old_path.clone());
3879 }
3880 }
3881
3882 Ok(events)
3883 }
3884
3885 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3886 let child_entries = std::fs::read_dir(&path).unwrap();
3887 let mut dirs = vec![path];
3888 let mut files = Vec::new();
3889 for child_entry in child_entries {
3890 let child_path = child_entry.unwrap().path();
3891 if child_path.is_dir() {
3892 let (child_dirs, child_files) = read_dir_recursive(child_path);
3893 dirs.extend(child_dirs);
3894 files.extend(child_files);
3895 } else {
3896 files.push(child_path);
3897 }
3898 }
3899 (dirs, files)
3900 }
3901
3902 fn gen_name(rng: &mut impl Rng) -> String {
3903 (0..6)
3904 .map(|_| rng.sample(rand::distributions::Alphanumeric))
3905 .map(char::from)
3906 .collect()
3907 }
3908
3909 impl Snapshot {
3910 fn check_invariants(&self) {
3911 let mut files = self.files(true, 0);
3912 let mut visible_files = self.files(false, 0);
3913 for entry in self.entries_by_path.cursor::<()>() {
3914 if entry.is_file() {
3915 assert_eq!(files.next().unwrap().inode, entry.inode);
3916 if !entry.is_ignored {
3917 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
3918 }
3919 }
3920 }
3921 assert!(files.next().is_none());
3922 assert!(visible_files.next().is_none());
3923
3924 let mut bfs_paths = Vec::new();
3925 let mut stack = vec![Path::new("")];
3926 while let Some(path) = stack.pop() {
3927 bfs_paths.push(path);
3928 let ix = stack.len();
3929 for child_entry in self.child_entries(path) {
3930 stack.insert(ix, &child_entry.path);
3931 }
3932 }
3933
3934 let dfs_paths = self
3935 .entries_by_path
3936 .cursor::<()>()
3937 .map(|e| e.path.as_ref())
3938 .collect::<Vec<_>>();
3939 assert_eq!(bfs_paths, dfs_paths);
3940
3941 for (ignore_parent_path, _) in &self.ignores {
3942 assert!(self.entry_for_path(ignore_parent_path).is_some());
3943 assert!(self
3944 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
3945 .is_some());
3946 }
3947 }
3948
3949 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
3950 let mut paths = Vec::new();
3951 for entry in self.entries_by_path.cursor::<()>() {
3952 if include_ignored || !entry.is_ignored {
3953 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
3954 }
3955 }
3956 paths.sort_by(|a, b| a.0.cmp(&b.0));
3957 paths
3958 }
3959 }
3960}