1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4 DiagnosticSummary,
5};
6use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
7use anyhow::{anyhow, Context, Result};
8use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
9use clock::ReplicaId;
10use collections::{hash_map, HashMap};
11use futures::{Stream, StreamExt};
12use fuzzy::CharBag;
13use gpui::{
14 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
15 Task, UpgradeModelHandle, WeakModelHandle,
16};
17use language::{
18 Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Language, LanguageRegistry,
19 Operation, PointUtf16, Rope,
20};
21use lazy_static::lazy_static;
22use lsp::LanguageServer;
23use parking_lot::Mutex;
24use postage::{
25 prelude::{Sink as _, Stream as _},
26 watch,
27};
28use serde::Deserialize;
29use smol::channel::{self, Sender};
30use std::{
31 any::Any,
32 cmp::{self, Ordering},
33 convert::{TryFrom, TryInto},
34 ffi::{OsStr, OsString},
35 fmt,
36 future::Future,
37 ops::{Deref, Range},
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicUsize, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, SystemTime},
44};
45use sum_tree::Bias;
46use sum_tree::{Edit, SeekTarget, SumTree};
47use util::{post_inc, ResultExt, TryFutureExt};
48
49lazy_static! {
50 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
51}
52
53#[derive(Clone, Debug)]
54enum ScanState {
55 Idle,
56 Scanning,
57 Err(Arc<anyhow::Error>),
58}
59
60pub enum Worktree {
61 Local(LocalWorktree),
62 Remote(RemoteWorktree),
63}
64
65pub enum Event {
66 Closed,
67}
68
69#[derive(Clone, Debug)]
70pub struct Collaborator {
71 pub user: Arc<User>,
72 pub peer_id: PeerId,
73 pub replica_id: ReplicaId,
74}
75
76impl Collaborator {
77 fn from_proto(
78 message: proto::Collaborator,
79 user_store: &ModelHandle<UserStore>,
80 cx: &mut AsyncAppContext,
81 ) -> impl Future<Output = Result<Self>> {
82 let user = user_store.update(cx, |user_store, cx| {
83 user_store.fetch_user(message.user_id, cx)
84 });
85
86 async move {
87 Ok(Self {
88 peer_id: PeerId(message.peer_id),
89 user: user.await?,
90 replica_id: message.replica_id as ReplicaId,
91 })
92 }
93 }
94}
95
96impl Entity for Worktree {
97 type Event = Event;
98
99 fn release(&mut self, cx: &mut MutableAppContext) {
100 match self {
101 Self::Local(tree) => {
102 if let Some(worktree_id) = *tree.remote_id.borrow() {
103 let rpc = tree.client.clone();
104 cx.spawn(|_| async move {
105 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
106 log::error!("error closing worktree: {}", err);
107 }
108 })
109 .detach();
110 }
111 }
112 Self::Remote(tree) => {
113 let rpc = tree.client.clone();
114 let worktree_id = tree.remote_id;
115 cx.spawn(|_| async move {
116 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
117 log::error!("error closing worktree: {}", err);
118 }
119 })
120 .detach();
121 }
122 }
123 }
124
125 fn app_will_quit(
126 &mut self,
127 _: &mut MutableAppContext,
128 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
129 use futures::FutureExt;
130
131 if let Self::Local(worktree) = self {
132 let shutdown_futures = worktree
133 .language_servers
134 .drain()
135 .filter_map(|(_, server)| server.shutdown())
136 .collect::<Vec<_>>();
137 Some(
138 async move {
139 futures::future::join_all(shutdown_futures).await;
140 }
141 .boxed(),
142 )
143 } else {
144 None
145 }
146 }
147}
148
149impl Worktree {
150 pub async fn open_local(
151 client: Arc<Client>,
152 user_store: ModelHandle<UserStore>,
153 path: impl Into<Arc<Path>>,
154 fs: Arc<dyn Fs>,
155 languages: Arc<LanguageRegistry>,
156 cx: &mut AsyncAppContext,
157 ) -> Result<ModelHandle<Self>> {
158 let (tree, scan_states_tx) =
159 LocalWorktree::new(client, user_store, path, fs.clone(), languages, cx).await?;
160 tree.update(cx, |tree, cx| {
161 let tree = tree.as_local_mut().unwrap();
162 let abs_path = tree.snapshot.abs_path.clone();
163 let background_snapshot = tree.background_snapshot.clone();
164 let background = cx.background().clone();
165 tree._background_scanner_task = Some(cx.background().spawn(async move {
166 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
167 let scanner =
168 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
169 scanner.run(events).await;
170 }));
171 });
172 Ok(tree)
173 }
174
175 pub async fn open_remote(
176 client: Arc<Client>,
177 id: u64,
178 languages: Arc<LanguageRegistry>,
179 user_store: ModelHandle<UserStore>,
180 cx: &mut AsyncAppContext,
181 ) -> Result<ModelHandle<Self>> {
182 let response = client
183 .request(proto::JoinWorktree { worktree_id: id })
184 .await?;
185 Worktree::remote(response, client, user_store, languages, cx).await
186 }
187
188 async fn remote(
189 join_response: proto::JoinWorktreeResponse,
190 client: Arc<Client>,
191 user_store: ModelHandle<UserStore>,
192 languages: Arc<LanguageRegistry>,
193 cx: &mut AsyncAppContext,
194 ) -> Result<ModelHandle<Self>> {
195 let worktree = join_response
196 .worktree
197 .ok_or_else(|| anyhow!("empty worktree"))?;
198
199 let remote_id = worktree.id;
200 let replica_id = join_response.replica_id as ReplicaId;
201 let root_char_bag: CharBag = worktree
202 .root_name
203 .chars()
204 .map(|c| c.to_ascii_lowercase())
205 .collect();
206 let root_name = worktree.root_name.clone();
207 let (entries_by_path, entries_by_id) = cx
208 .background()
209 .spawn(async move {
210 let mut entries_by_path_edits = Vec::new();
211 let mut entries_by_id_edits = Vec::new();
212 for entry in worktree.entries {
213 match Entry::try_from((&root_char_bag, entry)) {
214 Ok(entry) => {
215 entries_by_id_edits.push(Edit::Insert(PathEntry {
216 id: entry.id,
217 path: entry.path.clone(),
218 is_ignored: entry.is_ignored,
219 scan_id: 0,
220 }));
221 entries_by_path_edits.push(Edit::Insert(entry));
222 }
223 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
224 }
225 }
226
227 let mut entries_by_path = SumTree::new();
228 let mut entries_by_id = SumTree::new();
229 entries_by_path.edit(entries_by_path_edits, &());
230 entries_by_id.edit(entries_by_id_edits, &());
231 (entries_by_path, entries_by_id)
232 })
233 .await;
234
235 let user_ids = join_response
236 .collaborators
237 .iter()
238 .map(|peer| peer.user_id)
239 .collect();
240 user_store
241 .update(cx, |user_store, cx| user_store.load_users(user_ids, cx))
242 .await?;
243 let mut collaborators = HashMap::default();
244 for message in join_response.collaborators {
245 let collaborator = Collaborator::from_proto(message, &user_store, cx).await?;
246 collaborators.insert(collaborator.peer_id, collaborator);
247 }
248
249 let worktree = cx.update(|cx| {
250 cx.add_model(|cx: &mut ModelContext<Worktree>| {
251 let snapshot = Snapshot {
252 id: cx.model_id(),
253 scan_id: 0,
254 abs_path: Path::new("").into(),
255 root_name,
256 root_char_bag,
257 ignores: Default::default(),
258 entries_by_path,
259 entries_by_id,
260 removed_entry_ids: Default::default(),
261 next_entry_id: Default::default(),
262 };
263
264 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
265 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
266
267 cx.background()
268 .spawn(async move {
269 while let Some(update) = updates_rx.recv().await {
270 let mut snapshot = snapshot_tx.borrow().clone();
271 if let Err(error) = snapshot.apply_update(update) {
272 log::error!("error applying worktree update: {}", error);
273 }
274 *snapshot_tx.borrow_mut() = snapshot;
275 }
276 })
277 .detach();
278
279 {
280 let mut snapshot_rx = snapshot_rx.clone();
281 cx.spawn_weak(|this, mut cx| async move {
282 while let Some(_) = snapshot_rx.recv().await {
283 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
284 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
285 } else {
286 break;
287 }
288 }
289 })
290 .detach();
291 }
292
293 let _subscriptions = vec![
294 client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator),
295 client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator),
296 client.subscribe_to_entity(remote_id, cx, Self::handle_update),
297 client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
298 client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
299 client.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
300 ];
301
302 Worktree::Remote(RemoteWorktree {
303 remote_id,
304 replica_id,
305 snapshot,
306 snapshot_rx,
307 updates_tx,
308 client: client.clone(),
309 loading_buffers: Default::default(),
310 open_buffers: Default::default(),
311 diagnostic_summaries: HashMap::default(),
312 collaborators,
313 queued_operations: Default::default(),
314 languages,
315 user_store,
316 _subscriptions,
317 })
318 })
319 });
320
321 Ok(worktree)
322 }
323
324 pub fn as_local(&self) -> Option<&LocalWorktree> {
325 if let Worktree::Local(worktree) = self {
326 Some(worktree)
327 } else {
328 None
329 }
330 }
331
332 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
333 if let Worktree::Local(worktree) = self {
334 Some(worktree)
335 } else {
336 None
337 }
338 }
339
340 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
341 if let Worktree::Remote(worktree) = self {
342 Some(worktree)
343 } else {
344 None
345 }
346 }
347
348 pub fn snapshot(&self) -> Snapshot {
349 match self {
350 Worktree::Local(worktree) => worktree.snapshot(),
351 Worktree::Remote(worktree) => worktree.snapshot(),
352 }
353 }
354
355 pub fn replica_id(&self) -> ReplicaId {
356 match self {
357 Worktree::Local(_) => 0,
358 Worktree::Remote(worktree) => worktree.replica_id,
359 }
360 }
361
362 pub fn languages(&self) -> &Arc<LanguageRegistry> {
363 match self {
364 Worktree::Local(worktree) => &worktree.languages,
365 Worktree::Remote(worktree) => &worktree.languages,
366 }
367 }
368
369 pub fn user_store(&self) -> &ModelHandle<UserStore> {
370 match self {
371 Worktree::Local(worktree) => &worktree.user_store,
372 Worktree::Remote(worktree) => &worktree.user_store,
373 }
374 }
375
376 pub fn handle_add_collaborator(
377 &mut self,
378 mut envelope: TypedEnvelope<proto::AddCollaborator>,
379 _: Arc<Client>,
380 cx: &mut ModelContext<Self>,
381 ) -> Result<()> {
382 let user_store = self.user_store().clone();
383 let collaborator = envelope
384 .payload
385 .collaborator
386 .take()
387 .ok_or_else(|| anyhow!("empty collaborator"))?;
388
389 cx.spawn(|this, mut cx| {
390 async move {
391 let collaborator =
392 Collaborator::from_proto(collaborator, &user_store, &mut cx).await?;
393 this.update(&mut cx, |this, cx| match this {
394 Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx),
395 Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx),
396 });
397 Ok(())
398 }
399 .log_err()
400 })
401 .detach();
402
403 Ok(())
404 }
405
406 pub fn handle_remove_collaborator(
407 &mut self,
408 envelope: TypedEnvelope<proto::RemoveCollaborator>,
409 _: Arc<Client>,
410 cx: &mut ModelContext<Self>,
411 ) -> Result<()> {
412 match self {
413 Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx),
414 Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx),
415 }
416 }
417
418 pub fn handle_update(
419 &mut self,
420 envelope: TypedEnvelope<proto::UpdateWorktree>,
421 _: Arc<Client>,
422 cx: &mut ModelContext<Self>,
423 ) -> anyhow::Result<()> {
424 self.as_remote_mut()
425 .unwrap()
426 .update_from_remote(envelope, cx)
427 }
428
429 pub fn handle_open_buffer(
430 &mut self,
431 envelope: TypedEnvelope<proto::OpenBuffer>,
432 rpc: Arc<Client>,
433 cx: &mut ModelContext<Self>,
434 ) -> anyhow::Result<()> {
435 let receipt = envelope.receipt();
436
437 let response = self
438 .as_local_mut()
439 .unwrap()
440 .open_remote_buffer(envelope, cx);
441
442 cx.background()
443 .spawn(
444 async move {
445 rpc.respond(receipt, response.await?).await?;
446 Ok(())
447 }
448 .log_err(),
449 )
450 .detach();
451
452 Ok(())
453 }
454
455 pub fn handle_close_buffer(
456 &mut self,
457 envelope: TypedEnvelope<proto::CloseBuffer>,
458 _: Arc<Client>,
459 cx: &mut ModelContext<Self>,
460 ) -> anyhow::Result<()> {
461 self.as_local_mut()
462 .unwrap()
463 .close_remote_buffer(envelope, cx)
464 }
465
466 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
467 match self {
468 Worktree::Local(worktree) => &worktree.collaborators,
469 Worktree::Remote(worktree) => &worktree.collaborators,
470 }
471 }
472
473 pub fn diagnostic_summaries<'a>(
474 &'a self,
475 ) -> impl Iterator<Item = (Arc<Path>, DiagnosticSummary)> + 'a {
476 match self {
477 Worktree::Local(worktree) => &worktree.diagnostic_summaries,
478 Worktree::Remote(worktree) => &worktree.diagnostic_summaries,
479 }
480 .iter()
481 .map(|(path, summary)| (path.clone(), summary.clone()))
482 }
483
484 pub fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers {
485 match self {
486 Worktree::Local(worktree) => &mut worktree.loading_buffers,
487 Worktree::Remote(worktree) => &mut worktree.loading_buffers,
488 }
489 }
490
491 pub fn open_buffer(
492 &mut self,
493 path: impl AsRef<Path>,
494 cx: &mut ModelContext<Self>,
495 ) -> Task<Result<ModelHandle<Buffer>>> {
496 let path = path.as_ref();
497
498 // If there is already a buffer for the given path, then return it.
499 let existing_buffer = match self {
500 Worktree::Local(worktree) => worktree.get_open_buffer(path, cx),
501 Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx),
502 };
503 if let Some(existing_buffer) = existing_buffer {
504 return cx.spawn(move |_, _| async move { Ok(existing_buffer) });
505 }
506
507 let path: Arc<Path> = Arc::from(path);
508 let mut loading_watch = match self.loading_buffers().entry(path.clone()) {
509 // If the given path is already being loaded, then wait for that existing
510 // task to complete and return the same buffer.
511 hash_map::Entry::Occupied(e) => e.get().clone(),
512
513 // Otherwise, record the fact that this path is now being loaded.
514 hash_map::Entry::Vacant(entry) => {
515 let (mut tx, rx) = postage::watch::channel();
516 entry.insert(rx.clone());
517
518 let load_buffer = match self {
519 Worktree::Local(worktree) => worktree.open_buffer(&path, cx),
520 Worktree::Remote(worktree) => worktree.open_buffer(&path, cx),
521 };
522 cx.spawn(move |this, mut cx| async move {
523 let result = load_buffer.await;
524
525 // After the buffer loads, record the fact that it is no longer
526 // loading.
527 this.update(&mut cx, |this, _| this.loading_buffers().remove(&path));
528 *tx.borrow_mut() = Some(result.map_err(|e| Arc::new(e)));
529 })
530 .detach();
531 rx
532 }
533 };
534
535 cx.spawn(|_, _| async move {
536 loop {
537 if let Some(result) = loading_watch.borrow().as_ref() {
538 return result.clone().map_err(|e| anyhow!("{}", e));
539 }
540 loading_watch.recv().await;
541 }
542 })
543 }
544
545 #[cfg(feature = "test-support")]
546 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
547 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
548 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
549 Worktree::Remote(worktree) => {
550 Box::new(worktree.open_buffers.values().filter_map(|buf| {
551 if let RemoteBuffer::Loaded(buf) = buf {
552 Some(buf)
553 } else {
554 None
555 }
556 }))
557 }
558 };
559
560 let path = path.as_ref();
561 open_buffers
562 .find(|buffer| {
563 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
564 file.path().as_ref() == path
565 } else {
566 false
567 }
568 })
569 .is_some()
570 }
571
572 pub fn handle_update_buffer(
573 &mut self,
574 envelope: TypedEnvelope<proto::UpdateBuffer>,
575 _: Arc<Client>,
576 cx: &mut ModelContext<Self>,
577 ) -> Result<()> {
578 let payload = envelope.payload.clone();
579 let buffer_id = payload.buffer_id as usize;
580 let ops = payload
581 .operations
582 .into_iter()
583 .map(|op| language::proto::deserialize_operation(op))
584 .collect::<Result<Vec<_>, _>>()?;
585
586 match self {
587 Worktree::Local(worktree) => {
588 let buffer = worktree
589 .open_buffers
590 .get(&buffer_id)
591 .and_then(|buf| buf.upgrade(cx))
592 .ok_or_else(|| {
593 anyhow!("invalid buffer {} in update buffer message", buffer_id)
594 })?;
595 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
596 }
597 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
598 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
599 Some(RemoteBuffer::Loaded(buffer)) => {
600 if let Some(buffer) = buffer.upgrade(cx) {
601 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
602 } else {
603 worktree
604 .open_buffers
605 .insert(buffer_id, RemoteBuffer::Operations(ops));
606 }
607 }
608 None => {
609 worktree
610 .open_buffers
611 .insert(buffer_id, RemoteBuffer::Operations(ops));
612 }
613 },
614 }
615
616 Ok(())
617 }
618
619 pub fn handle_save_buffer(
620 &mut self,
621 envelope: TypedEnvelope<proto::SaveBuffer>,
622 rpc: Arc<Client>,
623 cx: &mut ModelContext<Self>,
624 ) -> Result<()> {
625 let sender_id = envelope.original_sender_id()?;
626 let buffer = self
627 .as_local()
628 .unwrap()
629 .shared_buffers
630 .get(&sender_id)
631 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
632 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
633
634 let receipt = envelope.receipt();
635 let worktree_id = envelope.payload.worktree_id;
636 let buffer_id = envelope.payload.buffer_id;
637 let save = cx.spawn(|_, mut cx| async move {
638 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
639 });
640
641 cx.background()
642 .spawn(
643 async move {
644 let (version, mtime) = save.await?;
645
646 rpc.respond(
647 receipt,
648 proto::BufferSaved {
649 worktree_id,
650 buffer_id,
651 version: (&version).into(),
652 mtime: Some(mtime.into()),
653 },
654 )
655 .await?;
656
657 Ok(())
658 }
659 .log_err(),
660 )
661 .detach();
662
663 Ok(())
664 }
665
666 pub fn handle_buffer_saved(
667 &mut self,
668 envelope: TypedEnvelope<proto::BufferSaved>,
669 _: Arc<Client>,
670 cx: &mut ModelContext<Self>,
671 ) -> Result<()> {
672 let payload = envelope.payload.clone();
673 let worktree = self.as_remote_mut().unwrap();
674 if let Some(buffer) = worktree
675 .open_buffers
676 .get(&(payload.buffer_id as usize))
677 .and_then(|buf| buf.upgrade(cx))
678 {
679 buffer.update(cx, |buffer, cx| {
680 let version = payload.version.try_into()?;
681 let mtime = payload
682 .mtime
683 .ok_or_else(|| anyhow!("missing mtime"))?
684 .into();
685 buffer.did_save(version, mtime, None, cx);
686 Result::<_, anyhow::Error>::Ok(())
687 })?;
688 }
689 Ok(())
690 }
691
692 pub fn handle_unshare(
693 &mut self,
694 _: TypedEnvelope<proto::UnshareWorktree>,
695 _: Arc<Client>,
696 cx: &mut ModelContext<Self>,
697 ) -> Result<()> {
698 cx.emit(Event::Closed);
699 Ok(())
700 }
701
702 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
703 match self {
704 Self::Local(worktree) => {
705 let is_fake_fs = worktree.fs.is_fake();
706 worktree.snapshot = worktree.background_snapshot.lock().clone();
707 if worktree.is_scanning() {
708 if worktree.poll_task.is_none() {
709 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
710 if is_fake_fs {
711 smol::future::yield_now().await;
712 } else {
713 smol::Timer::after(Duration::from_millis(100)).await;
714 }
715 this.update(&mut cx, |this, cx| {
716 this.as_local_mut().unwrap().poll_task = None;
717 this.poll_snapshot(cx);
718 })
719 }));
720 }
721 } else {
722 worktree.poll_task.take();
723 self.update_open_buffers(cx);
724 }
725 }
726 Self::Remote(worktree) => {
727 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
728 self.update_open_buffers(cx);
729 }
730 };
731
732 cx.notify();
733 }
734
735 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
736 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
737 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
738 Self::Remote(worktree) => {
739 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
740 if let RemoteBuffer::Loaded(buf) = buf {
741 Some((id, buf))
742 } else {
743 None
744 }
745 }))
746 }
747 };
748
749 let local = self.as_local().is_some();
750 let worktree_path = self.abs_path.clone();
751 let worktree_handle = cx.handle();
752 let mut buffers_to_delete = Vec::new();
753 for (buffer_id, buffer) in open_buffers {
754 if let Some(buffer) = buffer.upgrade(cx) {
755 buffer.update(cx, |buffer, cx| {
756 if let Some(old_file) = buffer.file() {
757 let new_file = if let Some(entry) = old_file
758 .entry_id()
759 .and_then(|entry_id| self.entry_for_id(entry_id))
760 {
761 File {
762 is_local: local,
763 worktree_path: worktree_path.clone(),
764 entry_id: Some(entry.id),
765 mtime: entry.mtime,
766 path: entry.path.clone(),
767 worktree: worktree_handle.clone(),
768 }
769 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
770 File {
771 is_local: local,
772 worktree_path: worktree_path.clone(),
773 entry_id: Some(entry.id),
774 mtime: entry.mtime,
775 path: entry.path.clone(),
776 worktree: worktree_handle.clone(),
777 }
778 } else {
779 File {
780 is_local: local,
781 worktree_path: worktree_path.clone(),
782 entry_id: None,
783 path: old_file.path().clone(),
784 mtime: old_file.mtime(),
785 worktree: worktree_handle.clone(),
786 }
787 };
788
789 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
790 task.detach();
791 }
792 }
793 });
794 } else {
795 buffers_to_delete.push(*buffer_id);
796 }
797 }
798
799 for buffer_id in buffers_to_delete {
800 match self {
801 Self::Local(worktree) => {
802 worktree.open_buffers.remove(&buffer_id);
803 }
804 Self::Remote(worktree) => {
805 worktree.open_buffers.remove(&buffer_id);
806 }
807 }
808 }
809 }
810
811 fn update_diagnostics(
812 &mut self,
813 params: lsp::PublishDiagnosticsParams,
814 cx: &mut ModelContext<Worktree>,
815 ) -> Result<()> {
816 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
817 let abs_path = params
818 .uri
819 .to_file_path()
820 .map_err(|_| anyhow!("URI is not a file"))?;
821 let worktree_path = Arc::from(
822 abs_path
823 .strip_prefix(&this.abs_path)
824 .context("path is not within worktree")?,
825 );
826
827 let mut group_ids_by_diagnostic_range = HashMap::default();
828 let mut diagnostics_by_group_id = HashMap::default();
829 let mut next_group_id = 0;
830 for diagnostic in ¶ms.diagnostics {
831 let source = diagnostic.source.as_ref();
832 let code = diagnostic.code.as_ref();
833 let group_id = diagnostic_ranges(&diagnostic, &abs_path)
834 .find_map(|range| group_ids_by_diagnostic_range.get(&(source, code, range)))
835 .copied()
836 .unwrap_or_else(|| {
837 let group_id = post_inc(&mut next_group_id);
838 for range in diagnostic_ranges(&diagnostic, &abs_path) {
839 group_ids_by_diagnostic_range.insert((source, code, range), group_id);
840 }
841 group_id
842 });
843
844 diagnostics_by_group_id
845 .entry(group_id)
846 .or_insert(Vec::new())
847 .push(DiagnosticEntry {
848 range: diagnostic.range.start.to_point_utf16()
849 ..diagnostic.range.end.to_point_utf16(),
850 diagnostic: Diagnostic {
851 source: diagnostic.source.clone(),
852 code: diagnostic.code.clone().map(|code| match code {
853 lsp::NumberOrString::Number(code) => code.to_string(),
854 lsp::NumberOrString::String(code) => code,
855 }),
856 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
857 message: diagnostic.message.clone(),
858 group_id,
859 is_primary: false,
860 },
861 });
862 }
863
864 let diagnostics = diagnostics_by_group_id
865 .into_values()
866 .flat_map(|mut diagnostics| {
867 let primary = diagnostics
868 .iter_mut()
869 .min_by_key(|entry| entry.diagnostic.severity)
870 .unwrap();
871 primary.diagnostic.is_primary = true;
872 diagnostics
873 })
874 .collect::<Vec<_>>();
875
876 for buffer in this.open_buffers.values() {
877 if let Some(buffer) = buffer.upgrade(cx) {
878 if buffer
879 .read(cx)
880 .file()
881 .map_or(false, |file| *file.path() == worktree_path)
882 {
883 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
884 (
885 buffer.remote_id(),
886 buffer.update_diagnostics(params.version, diagnostics.clone(), cx),
887 )
888 });
889 self.send_buffer_update(remote_id, operation?, cx);
890 break;
891 }
892 }
893 }
894
895 let this = self.as_local_mut().unwrap();
896 this.diagnostic_summaries
897 .insert(worktree_path.clone(), DiagnosticSummary::new(&diagnostics));
898 this.diagnostics.insert(worktree_path.clone(), diagnostics);
899 Ok(())
900 }
901
902 fn send_buffer_update(
903 &mut self,
904 buffer_id: u64,
905 operation: Operation,
906 cx: &mut ModelContext<Self>,
907 ) {
908 if let Some((rpc, remote_id)) = match self {
909 Worktree::Local(worktree) => worktree
910 .remote_id
911 .borrow()
912 .map(|id| (worktree.client.clone(), id)),
913 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
914 } {
915 cx.spawn(|worktree, mut cx| async move {
916 if let Err(error) = rpc
917 .request(proto::UpdateBuffer {
918 worktree_id: remote_id,
919 buffer_id,
920 operations: vec![language::proto::serialize_operation(&operation)],
921 })
922 .await
923 {
924 worktree.update(&mut cx, |worktree, _| {
925 log::error!("error sending buffer operation: {}", error);
926 match worktree {
927 Worktree::Local(t) => &mut t.queued_operations,
928 Worktree::Remote(t) => &mut t.queued_operations,
929 }
930 .push((buffer_id, operation));
931 });
932 }
933 })
934 .detach();
935 }
936 }
937}
938
939#[derive(Clone)]
940pub struct Snapshot {
941 id: usize,
942 scan_id: usize,
943 abs_path: Arc<Path>,
944 root_name: String,
945 root_char_bag: CharBag,
946 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
947 entries_by_path: SumTree<Entry>,
948 entries_by_id: SumTree<PathEntry>,
949 removed_entry_ids: HashMap<u64, usize>,
950 next_entry_id: Arc<AtomicUsize>,
951}
952
953pub struct LocalWorktree {
954 snapshot: Snapshot,
955 config: WorktreeConfig,
956 background_snapshot: Arc<Mutex<Snapshot>>,
957 last_scan_state_rx: watch::Receiver<ScanState>,
958 _background_scanner_task: Option<Task<()>>,
959 _maintain_remote_id_task: Task<Option<()>>,
960 poll_task: Option<Task<()>>,
961 remote_id: watch::Receiver<Option<u64>>,
962 share: Option<ShareState>,
963 loading_buffers: LoadingBuffers,
964 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
965 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
966 diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
967 diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
968 collaborators: HashMap<PeerId, Collaborator>,
969 queued_operations: Vec<(u64, Operation)>,
970 languages: Arc<LanguageRegistry>,
971 client: Arc<Client>,
972 user_store: ModelHandle<UserStore>,
973 fs: Arc<dyn Fs>,
974 language_servers: HashMap<String, Arc<LanguageServer>>,
975}
976
977struct ShareState {
978 snapshots_tx: Sender<Snapshot>,
979 _subscriptions: Vec<client::Subscription>,
980}
981
982pub struct RemoteWorktree {
983 remote_id: u64,
984 snapshot: Snapshot,
985 snapshot_rx: watch::Receiver<Snapshot>,
986 client: Arc<Client>,
987 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
988 replica_id: ReplicaId,
989 loading_buffers: LoadingBuffers,
990 open_buffers: HashMap<usize, RemoteBuffer>,
991 collaborators: HashMap<PeerId, Collaborator>,
992 diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
993 languages: Arc<LanguageRegistry>,
994 user_store: ModelHandle<UserStore>,
995 queued_operations: Vec<(u64, Operation)>,
996 _subscriptions: Vec<client::Subscription>,
997}
998
999type LoadingBuffers = HashMap<
1000 Arc<Path>,
1001 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
1002>;
1003
1004#[derive(Default, Deserialize)]
1005struct WorktreeConfig {
1006 collaborators: Vec<String>,
1007}
1008
1009impl LocalWorktree {
1010 async fn new(
1011 client: Arc<Client>,
1012 user_store: ModelHandle<UserStore>,
1013 path: impl Into<Arc<Path>>,
1014 fs: Arc<dyn Fs>,
1015 languages: Arc<LanguageRegistry>,
1016 cx: &mut AsyncAppContext,
1017 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
1018 let abs_path = path.into();
1019 let path: Arc<Path> = Arc::from(Path::new(""));
1020 let next_entry_id = AtomicUsize::new(0);
1021
1022 // After determining whether the root entry is a file or a directory, populate the
1023 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
1024 let root_name = abs_path
1025 .file_name()
1026 .map_or(String::new(), |f| f.to_string_lossy().to_string());
1027 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
1028 let metadata = fs.metadata(&abs_path).await?;
1029
1030 let mut config = WorktreeConfig::default();
1031 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
1032 if let Ok(parsed) = toml::from_str(&zed_toml) {
1033 config = parsed;
1034 }
1035 }
1036
1037 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
1038 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
1039 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
1040 let mut snapshot = Snapshot {
1041 id: cx.model_id(),
1042 scan_id: 0,
1043 abs_path,
1044 root_name: root_name.clone(),
1045 root_char_bag,
1046 ignores: Default::default(),
1047 entries_by_path: Default::default(),
1048 entries_by_id: Default::default(),
1049 removed_entry_ids: Default::default(),
1050 next_entry_id: Arc::new(next_entry_id),
1051 };
1052 if let Some(metadata) = metadata {
1053 snapshot.insert_entry(
1054 Entry::new(
1055 path.into(),
1056 &metadata,
1057 &snapshot.next_entry_id,
1058 snapshot.root_char_bag,
1059 ),
1060 fs.as_ref(),
1061 );
1062 }
1063
1064 let (mut remote_id_tx, remote_id_rx) = watch::channel();
1065 let _maintain_remote_id_task = cx.spawn_weak({
1066 let rpc = client.clone();
1067 move |this, cx| {
1068 async move {
1069 let mut status = rpc.status();
1070 while let Some(status) = status.recv().await {
1071 if let Some(this) = this.upgrade(&cx) {
1072 let remote_id = if let client::Status::Connected { .. } = status {
1073 let authorized_logins = this.read_with(&cx, |this, _| {
1074 this.as_local().unwrap().config.collaborators.clone()
1075 });
1076 let response = rpc
1077 .request(proto::OpenWorktree {
1078 root_name: root_name.clone(),
1079 authorized_logins,
1080 })
1081 .await?;
1082
1083 Some(response.worktree_id)
1084 } else {
1085 None
1086 };
1087 if remote_id_tx.send(remote_id).await.is_err() {
1088 break;
1089 }
1090 }
1091 }
1092 Ok(())
1093 }
1094 .log_err()
1095 }
1096 });
1097
1098 let tree = Self {
1099 snapshot: snapshot.clone(),
1100 config,
1101 remote_id: remote_id_rx,
1102 background_snapshot: Arc::new(Mutex::new(snapshot)),
1103 last_scan_state_rx,
1104 _background_scanner_task: None,
1105 _maintain_remote_id_task,
1106 share: None,
1107 poll_task: None,
1108 loading_buffers: Default::default(),
1109 open_buffers: Default::default(),
1110 shared_buffers: Default::default(),
1111 diagnostics: Default::default(),
1112 diagnostic_summaries: Default::default(),
1113 queued_operations: Default::default(),
1114 collaborators: Default::default(),
1115 languages,
1116 client,
1117 user_store,
1118 fs,
1119 language_servers: Default::default(),
1120 };
1121
1122 cx.spawn_weak(|this, mut cx| async move {
1123 while let Ok(scan_state) = scan_states_rx.recv().await {
1124 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1125 let to_send = handle.update(&mut cx, |this, cx| {
1126 last_scan_state_tx.blocking_send(scan_state).ok();
1127 this.poll_snapshot(cx);
1128 let tree = this.as_local_mut().unwrap();
1129 if !tree.is_scanning() {
1130 if let Some(share) = tree.share.as_ref() {
1131 return Some((tree.snapshot(), share.snapshots_tx.clone()));
1132 }
1133 }
1134 None
1135 });
1136
1137 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
1138 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
1139 log::error!("error submitting snapshot to send {}", err);
1140 }
1141 }
1142 } else {
1143 break;
1144 }
1145 }
1146 })
1147 .detach();
1148
1149 Worktree::Local(tree)
1150 });
1151
1152 Ok((tree, scan_states_tx))
1153 }
1154
1155 pub fn languages(&self) -> &LanguageRegistry {
1156 &self.languages
1157 }
1158
1159 pub fn ensure_language_server(
1160 &mut self,
1161 language: &Language,
1162 cx: &mut ModelContext<Worktree>,
1163 ) -> Option<Arc<LanguageServer>> {
1164 if let Some(server) = self.language_servers.get(language.name()) {
1165 return Some(server.clone());
1166 }
1167
1168 if let Some(language_server) = language
1169 .start_server(self.abs_path(), cx)
1170 .log_err()
1171 .flatten()
1172 {
1173 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
1174 language_server
1175 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
1176 smol::block_on(diagnostics_tx.send(params)).ok();
1177 })
1178 .detach();
1179 cx.spawn_weak(|this, mut cx| async move {
1180 while let Ok(diagnostics) = diagnostics_rx.recv().await {
1181 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1182 handle.update(&mut cx, |this, cx| {
1183 this.update_diagnostics(diagnostics, cx).log_err();
1184 });
1185 } else {
1186 break;
1187 }
1188 }
1189 })
1190 .detach();
1191
1192 self.language_servers
1193 .insert(language.name().to_string(), language_server.clone());
1194 Some(language_server.clone())
1195 } else {
1196 None
1197 }
1198 }
1199
1200 fn get_open_buffer(
1201 &mut self,
1202 path: &Path,
1203 cx: &mut ModelContext<Worktree>,
1204 ) -> Option<ModelHandle<Buffer>> {
1205 let handle = cx.handle();
1206 let mut result = None;
1207 self.open_buffers.retain(|_buffer_id, buffer| {
1208 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1209 if let Some(file) = buffer.read(cx.as_ref()).file() {
1210 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1211 result = Some(buffer);
1212 }
1213 }
1214 true
1215 } else {
1216 false
1217 }
1218 });
1219 result
1220 }
1221
1222 fn open_buffer(
1223 &mut self,
1224 path: &Path,
1225 cx: &mut ModelContext<Worktree>,
1226 ) -> Task<Result<ModelHandle<Buffer>>> {
1227 let path = Arc::from(path);
1228 cx.spawn(move |this, mut cx| async move {
1229 let (file, contents) = this
1230 .update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))
1231 .await?;
1232
1233 let (diagnostics, language, language_server) = this.update(&mut cx, |this, cx| {
1234 let this = this.as_local_mut().unwrap();
1235 let diagnostics = this.diagnostics.remove(&path);
1236 let language = this.languages.select_language(file.full_path()).cloned();
1237 let server = language
1238 .as_ref()
1239 .and_then(|language| this.ensure_language_server(language, cx));
1240 (diagnostics, language, server)
1241 });
1242
1243 let buffer = cx.add_model(|cx| {
1244 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
1245 buffer.set_language(language, language_server, cx);
1246 if let Some(diagnostics) = diagnostics {
1247 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1248 }
1249 buffer
1250 });
1251
1252 this.update(&mut cx, |this, _| {
1253 let this = this.as_local_mut().unwrap();
1254 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1255 });
1256
1257 Ok(buffer)
1258 })
1259 }
1260
1261 pub fn open_remote_buffer(
1262 &mut self,
1263 envelope: TypedEnvelope<proto::OpenBuffer>,
1264 cx: &mut ModelContext<Worktree>,
1265 ) -> Task<Result<proto::OpenBufferResponse>> {
1266 cx.spawn(|this, mut cx| async move {
1267 let peer_id = envelope.original_sender_id();
1268 let path = Path::new(&envelope.payload.path);
1269 let buffer = this
1270 .update(&mut cx, |this, cx| this.open_buffer(path, cx))
1271 .await?;
1272 this.update(&mut cx, |this, cx| {
1273 this.as_local_mut()
1274 .unwrap()
1275 .shared_buffers
1276 .entry(peer_id?)
1277 .or_default()
1278 .insert(buffer.id() as u64, buffer.clone());
1279
1280 Ok(proto::OpenBufferResponse {
1281 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1282 })
1283 })
1284 })
1285 }
1286
1287 pub fn close_remote_buffer(
1288 &mut self,
1289 envelope: TypedEnvelope<proto::CloseBuffer>,
1290 cx: &mut ModelContext<Worktree>,
1291 ) -> Result<()> {
1292 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1293 shared_buffers.remove(&envelope.payload.buffer_id);
1294 cx.notify();
1295 }
1296
1297 Ok(())
1298 }
1299
1300 pub fn add_collaborator(
1301 &mut self,
1302 collaborator: Collaborator,
1303 cx: &mut ModelContext<Worktree>,
1304 ) {
1305 self.collaborators
1306 .insert(collaborator.peer_id, collaborator);
1307 cx.notify();
1308 }
1309
1310 pub fn remove_collaborator(
1311 &mut self,
1312 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1313 cx: &mut ModelContext<Worktree>,
1314 ) -> Result<()> {
1315 let peer_id = PeerId(envelope.payload.peer_id);
1316 let replica_id = self
1317 .collaborators
1318 .remove(&peer_id)
1319 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1320 .replica_id;
1321 self.shared_buffers.remove(&peer_id);
1322 for (_, buffer) in &self.open_buffers {
1323 if let Some(buffer) = buffer.upgrade(cx) {
1324 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1325 }
1326 }
1327 cx.notify();
1328
1329 Ok(())
1330 }
1331
1332 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1333 let mut scan_state_rx = self.last_scan_state_rx.clone();
1334 async move {
1335 let mut scan_state = Some(scan_state_rx.borrow().clone());
1336 while let Some(ScanState::Scanning) = scan_state {
1337 scan_state = scan_state_rx.recv().await;
1338 }
1339 }
1340 }
1341
1342 pub fn remote_id(&self) -> Option<u64> {
1343 *self.remote_id.borrow()
1344 }
1345
1346 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1347 let mut remote_id = self.remote_id.clone();
1348 async move {
1349 while let Some(remote_id) = remote_id.recv().await {
1350 if remote_id.is_some() {
1351 return remote_id;
1352 }
1353 }
1354 None
1355 }
1356 }
1357
1358 fn is_scanning(&self) -> bool {
1359 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1360 true
1361 } else {
1362 false
1363 }
1364 }
1365
1366 pub fn snapshot(&self) -> Snapshot {
1367 self.snapshot.clone()
1368 }
1369
1370 pub fn abs_path(&self) -> &Path {
1371 self.snapshot.abs_path.as_ref()
1372 }
1373
1374 pub fn contains_abs_path(&self, path: &Path) -> bool {
1375 path.starts_with(&self.snapshot.abs_path)
1376 }
1377
1378 fn absolutize(&self, path: &Path) -> PathBuf {
1379 if path.file_name().is_some() {
1380 self.snapshot.abs_path.join(path)
1381 } else {
1382 self.snapshot.abs_path.to_path_buf()
1383 }
1384 }
1385
1386 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1387 let handle = cx.handle();
1388 let path = Arc::from(path);
1389 let worktree_path = self.abs_path.clone();
1390 let abs_path = self.absolutize(&path);
1391 let background_snapshot = self.background_snapshot.clone();
1392 let fs = self.fs.clone();
1393 cx.spawn(|this, mut cx| async move {
1394 let text = fs.load(&abs_path).await?;
1395 // Eagerly populate the snapshot with an updated entry for the loaded file
1396 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1397 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1398 Ok((
1399 File {
1400 entry_id: Some(entry.id),
1401 worktree: handle,
1402 worktree_path,
1403 path: entry.path,
1404 mtime: entry.mtime,
1405 is_local: true,
1406 },
1407 text,
1408 ))
1409 })
1410 }
1411
1412 pub fn save_buffer_as(
1413 &self,
1414 buffer: ModelHandle<Buffer>,
1415 path: impl Into<Arc<Path>>,
1416 text: Rope,
1417 cx: &mut ModelContext<Worktree>,
1418 ) -> Task<Result<File>> {
1419 let save = self.save(path, text, cx);
1420 cx.spawn(|this, mut cx| async move {
1421 let entry = save.await?;
1422 this.update(&mut cx, |this, cx| {
1423 let this = this.as_local_mut().unwrap();
1424 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1425 Ok(File {
1426 entry_id: Some(entry.id),
1427 worktree: cx.handle(),
1428 worktree_path: this.abs_path.clone(),
1429 path: entry.path,
1430 mtime: entry.mtime,
1431 is_local: true,
1432 })
1433 })
1434 })
1435 }
1436
1437 fn save(
1438 &self,
1439 path: impl Into<Arc<Path>>,
1440 text: Rope,
1441 cx: &mut ModelContext<Worktree>,
1442 ) -> Task<Result<Entry>> {
1443 let path = path.into();
1444 let abs_path = self.absolutize(&path);
1445 let background_snapshot = self.background_snapshot.clone();
1446 let fs = self.fs.clone();
1447 let save = cx.background().spawn(async move {
1448 fs.save(&abs_path, &text).await?;
1449 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1450 });
1451
1452 cx.spawn(|this, mut cx| async move {
1453 let entry = save.await?;
1454 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1455 Ok(entry)
1456 })
1457 }
1458
1459 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1460 let snapshot = self.snapshot();
1461 let share_request = self.share_request(cx);
1462 let rpc = self.client.clone();
1463 cx.spawn(|this, mut cx| async move {
1464 let share_request = if let Some(request) = share_request.await {
1465 request
1466 } else {
1467 return Err(anyhow!("failed to open worktree on the server"));
1468 };
1469
1470 let remote_id = share_request.worktree.as_ref().unwrap().id;
1471 let share_response = rpc.request(share_request).await?;
1472
1473 log::info!("sharing worktree {:?}", share_response);
1474 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1475 smol::channel::unbounded::<Snapshot>();
1476
1477 cx.background()
1478 .spawn({
1479 let rpc = rpc.clone();
1480 async move {
1481 let mut prev_snapshot = snapshot;
1482 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1483 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1484 match rpc.send(message).await {
1485 Ok(()) => prev_snapshot = snapshot,
1486 Err(err) => log::error!("error sending snapshot diff {}", err),
1487 }
1488 }
1489 }
1490 })
1491 .detach();
1492
1493 this.update(&mut cx, |worktree, cx| {
1494 let _subscriptions = vec![
1495 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator),
1496 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator),
1497 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1498 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1499 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1500 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1501 ];
1502
1503 let worktree = worktree.as_local_mut().unwrap();
1504 worktree.share = Some(ShareState {
1505 snapshots_tx: snapshots_to_send_tx,
1506 _subscriptions,
1507 });
1508 });
1509
1510 Ok(remote_id)
1511 })
1512 }
1513
1514 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1515 self.share.take();
1516 let rpc = self.client.clone();
1517 let remote_id = self.remote_id();
1518 cx.foreground()
1519 .spawn(
1520 async move {
1521 if let Some(worktree_id) = remote_id {
1522 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1523 }
1524 Ok(())
1525 }
1526 .log_err(),
1527 )
1528 .detach()
1529 }
1530
1531 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1532 let remote_id = self.next_remote_id();
1533 let snapshot = self.snapshot();
1534 let root_name = self.root_name.clone();
1535 cx.background().spawn(async move {
1536 remote_id.await.map(|id| {
1537 let entries = snapshot
1538 .entries_by_path
1539 .cursor::<()>()
1540 .filter(|e| !e.is_ignored)
1541 .map(Into::into)
1542 .collect();
1543 proto::ShareWorktree {
1544 worktree: Some(proto::Worktree {
1545 id,
1546 root_name,
1547 entries,
1548 }),
1549 }
1550 })
1551 })
1552 }
1553}
1554
1555fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1556 let contents = smol::block_on(fs.load(&abs_path))?;
1557 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1558 let mut builder = GitignoreBuilder::new(parent);
1559 for line in contents.lines() {
1560 builder.add_line(Some(abs_path.into()), line)?;
1561 }
1562 Ok(builder.build()?)
1563}
1564
1565impl Deref for Worktree {
1566 type Target = Snapshot;
1567
1568 fn deref(&self) -> &Self::Target {
1569 match self {
1570 Worktree::Local(worktree) => &worktree.snapshot,
1571 Worktree::Remote(worktree) => &worktree.snapshot,
1572 }
1573 }
1574}
1575
1576impl Deref for LocalWorktree {
1577 type Target = Snapshot;
1578
1579 fn deref(&self) -> &Self::Target {
1580 &self.snapshot
1581 }
1582}
1583
1584impl fmt::Debug for LocalWorktree {
1585 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1586 self.snapshot.fmt(f)
1587 }
1588}
1589
1590impl RemoteWorktree {
1591 fn get_open_buffer(
1592 &mut self,
1593 path: &Path,
1594 cx: &mut ModelContext<Worktree>,
1595 ) -> Option<ModelHandle<Buffer>> {
1596 let handle = cx.handle();
1597 let mut existing_buffer = None;
1598 self.open_buffers.retain(|_buffer_id, buffer| {
1599 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1600 if let Some(file) = buffer.read(cx.as_ref()).file() {
1601 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1602 existing_buffer = Some(buffer);
1603 }
1604 }
1605 true
1606 } else {
1607 false
1608 }
1609 });
1610 existing_buffer
1611 }
1612
1613 fn open_buffer(
1614 &mut self,
1615 path: &Path,
1616 cx: &mut ModelContext<Worktree>,
1617 ) -> Task<Result<ModelHandle<Buffer>>> {
1618 let rpc = self.client.clone();
1619 let replica_id = self.replica_id;
1620 let remote_worktree_id = self.remote_id;
1621 let root_path = self.snapshot.abs_path.clone();
1622 let path: Arc<Path> = Arc::from(path);
1623 let path_string = path.to_string_lossy().to_string();
1624 cx.spawn_weak(move |this, mut cx| async move {
1625 let entry = this
1626 .upgrade(&cx)
1627 .ok_or_else(|| anyhow!("worktree was closed"))?
1628 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1629 .ok_or_else(|| anyhow!("file does not exist"))?;
1630 let response = rpc
1631 .request(proto::OpenBuffer {
1632 worktree_id: remote_worktree_id as u64,
1633 path: path_string,
1634 })
1635 .await?;
1636
1637 let this = this
1638 .upgrade(&cx)
1639 .ok_or_else(|| anyhow!("worktree was closed"))?;
1640 let file = File {
1641 entry_id: Some(entry.id),
1642 worktree: this.clone(),
1643 worktree_path: root_path,
1644 path: entry.path,
1645 mtime: entry.mtime,
1646 is_local: false,
1647 };
1648 let language = this.read_with(&cx, |this, _| {
1649 use language::File;
1650 this.languages().select_language(file.full_path()).cloned()
1651 });
1652 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1653 let buffer_id = remote_buffer.id as usize;
1654 let buffer = cx.add_model(|cx| {
1655 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1656 .unwrap()
1657 .with_language(language, None, cx)
1658 });
1659 this.update(&mut cx, move |this, cx| {
1660 let this = this.as_remote_mut().unwrap();
1661 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1662 .open_buffers
1663 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1664 {
1665 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1666 }
1667 Result::<_, anyhow::Error>::Ok(buffer)
1668 })
1669 })
1670 }
1671
1672 pub fn remote_id(&self) -> u64 {
1673 self.remote_id
1674 }
1675
1676 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1677 for (_, buffer) in self.open_buffers.drain() {
1678 if let RemoteBuffer::Loaded(buffer) = buffer {
1679 if let Some(buffer) = buffer.upgrade(cx) {
1680 buffer.update(cx, |buffer, cx| buffer.close(cx))
1681 }
1682 }
1683 }
1684 }
1685
1686 fn snapshot(&self) -> Snapshot {
1687 self.snapshot.clone()
1688 }
1689
1690 fn update_from_remote(
1691 &mut self,
1692 envelope: TypedEnvelope<proto::UpdateWorktree>,
1693 cx: &mut ModelContext<Worktree>,
1694 ) -> Result<()> {
1695 let mut tx = self.updates_tx.clone();
1696 let payload = envelope.payload.clone();
1697 cx.background()
1698 .spawn(async move {
1699 tx.send(payload).await.expect("receiver runs to completion");
1700 })
1701 .detach();
1702
1703 Ok(())
1704 }
1705
1706 pub fn add_collaborator(
1707 &mut self,
1708 collaborator: Collaborator,
1709 cx: &mut ModelContext<Worktree>,
1710 ) {
1711 self.collaborators
1712 .insert(collaborator.peer_id, collaborator);
1713 cx.notify();
1714 }
1715
1716 pub fn remove_collaborator(
1717 &mut self,
1718 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1719 cx: &mut ModelContext<Worktree>,
1720 ) -> Result<()> {
1721 let peer_id = PeerId(envelope.payload.peer_id);
1722 let replica_id = self
1723 .collaborators
1724 .remove(&peer_id)
1725 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1726 .replica_id;
1727 for (_, buffer) in &self.open_buffers {
1728 if let Some(buffer) = buffer.upgrade(cx) {
1729 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1730 }
1731 }
1732 cx.notify();
1733 Ok(())
1734 }
1735}
1736
1737enum RemoteBuffer {
1738 Operations(Vec<Operation>),
1739 Loaded(WeakModelHandle<Buffer>),
1740}
1741
1742impl RemoteBuffer {
1743 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1744 match self {
1745 Self::Operations(_) => None,
1746 Self::Loaded(buffer) => buffer.upgrade(cx),
1747 }
1748 }
1749}
1750
1751impl Snapshot {
1752 pub fn id(&self) -> usize {
1753 self.id
1754 }
1755
1756 pub fn build_update(
1757 &self,
1758 other: &Self,
1759 worktree_id: u64,
1760 include_ignored: bool,
1761 ) -> proto::UpdateWorktree {
1762 let mut updated_entries = Vec::new();
1763 let mut removed_entries = Vec::new();
1764 let mut self_entries = self
1765 .entries_by_id
1766 .cursor::<()>()
1767 .filter(|e| include_ignored || !e.is_ignored)
1768 .peekable();
1769 let mut other_entries = other
1770 .entries_by_id
1771 .cursor::<()>()
1772 .filter(|e| include_ignored || !e.is_ignored)
1773 .peekable();
1774 loop {
1775 match (self_entries.peek(), other_entries.peek()) {
1776 (Some(self_entry), Some(other_entry)) => {
1777 match Ord::cmp(&self_entry.id, &other_entry.id) {
1778 Ordering::Less => {
1779 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1780 updated_entries.push(entry);
1781 self_entries.next();
1782 }
1783 Ordering::Equal => {
1784 if self_entry.scan_id != other_entry.scan_id {
1785 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1786 updated_entries.push(entry);
1787 }
1788
1789 self_entries.next();
1790 other_entries.next();
1791 }
1792 Ordering::Greater => {
1793 removed_entries.push(other_entry.id as u64);
1794 other_entries.next();
1795 }
1796 }
1797 }
1798 (Some(self_entry), None) => {
1799 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1800 updated_entries.push(entry);
1801 self_entries.next();
1802 }
1803 (None, Some(other_entry)) => {
1804 removed_entries.push(other_entry.id as u64);
1805 other_entries.next();
1806 }
1807 (None, None) => break,
1808 }
1809 }
1810
1811 proto::UpdateWorktree {
1812 updated_entries,
1813 removed_entries,
1814 worktree_id,
1815 }
1816 }
1817
1818 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1819 self.scan_id += 1;
1820 let scan_id = self.scan_id;
1821
1822 let mut entries_by_path_edits = Vec::new();
1823 let mut entries_by_id_edits = Vec::new();
1824 for entry_id in update.removed_entries {
1825 let entry_id = entry_id as usize;
1826 let entry = self
1827 .entry_for_id(entry_id)
1828 .ok_or_else(|| anyhow!("unknown entry"))?;
1829 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1830 entries_by_id_edits.push(Edit::Remove(entry.id));
1831 }
1832
1833 for entry in update.updated_entries {
1834 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1835 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1836 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1837 }
1838 entries_by_id_edits.push(Edit::Insert(PathEntry {
1839 id: entry.id,
1840 path: entry.path.clone(),
1841 is_ignored: entry.is_ignored,
1842 scan_id,
1843 }));
1844 entries_by_path_edits.push(Edit::Insert(entry));
1845 }
1846
1847 self.entries_by_path.edit(entries_by_path_edits, &());
1848 self.entries_by_id.edit(entries_by_id_edits, &());
1849
1850 Ok(())
1851 }
1852
1853 pub fn file_count(&self) -> usize {
1854 self.entries_by_path.summary().file_count
1855 }
1856
1857 pub fn visible_file_count(&self) -> usize {
1858 self.entries_by_path.summary().visible_file_count
1859 }
1860
1861 fn traverse_from_offset(
1862 &self,
1863 include_dirs: bool,
1864 include_ignored: bool,
1865 start_offset: usize,
1866 ) -> Traversal {
1867 let mut cursor = self.entries_by_path.cursor();
1868 cursor.seek(
1869 &TraversalTarget::Count {
1870 count: start_offset,
1871 include_dirs,
1872 include_ignored,
1873 },
1874 Bias::Right,
1875 &(),
1876 );
1877 Traversal {
1878 cursor,
1879 include_dirs,
1880 include_ignored,
1881 }
1882 }
1883
1884 fn traverse_from_path(
1885 &self,
1886 include_dirs: bool,
1887 include_ignored: bool,
1888 path: &Path,
1889 ) -> Traversal {
1890 let mut cursor = self.entries_by_path.cursor();
1891 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1892 Traversal {
1893 cursor,
1894 include_dirs,
1895 include_ignored,
1896 }
1897 }
1898
1899 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1900 self.traverse_from_offset(false, include_ignored, start)
1901 }
1902
1903 pub fn entries(&self, include_ignored: bool) -> Traversal {
1904 self.traverse_from_offset(true, include_ignored, 0)
1905 }
1906
1907 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1908 let empty_path = Path::new("");
1909 self.entries_by_path
1910 .cursor::<()>()
1911 .filter(move |entry| entry.path.as_ref() != empty_path)
1912 .map(|entry| &entry.path)
1913 }
1914
1915 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1916 let mut cursor = self.entries_by_path.cursor();
1917 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1918 let traversal = Traversal {
1919 cursor,
1920 include_dirs: true,
1921 include_ignored: true,
1922 };
1923 ChildEntriesIter {
1924 traversal,
1925 parent_path,
1926 }
1927 }
1928
1929 pub fn root_entry(&self) -> Option<&Entry> {
1930 self.entry_for_path("")
1931 }
1932
1933 pub fn root_name(&self) -> &str {
1934 &self.root_name
1935 }
1936
1937 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1938 let path = path.as_ref();
1939 self.traverse_from_path(true, true, path)
1940 .entry()
1941 .and_then(|entry| {
1942 if entry.path.as_ref() == path {
1943 Some(entry)
1944 } else {
1945 None
1946 }
1947 })
1948 }
1949
1950 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1951 let entry = self.entries_by_id.get(&id, &())?;
1952 self.entry_for_path(&entry.path)
1953 }
1954
1955 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1956 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1957 }
1958
1959 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1960 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1961 let abs_path = self.abs_path.join(&entry.path);
1962 match build_gitignore(&abs_path, fs) {
1963 Ok(ignore) => {
1964 let ignore_dir_path = entry.path.parent().unwrap();
1965 self.ignores
1966 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1967 }
1968 Err(error) => {
1969 log::error!(
1970 "error loading .gitignore file {:?} - {:?}",
1971 &entry.path,
1972 error
1973 );
1974 }
1975 }
1976 }
1977
1978 self.reuse_entry_id(&mut entry);
1979 self.entries_by_path.insert_or_replace(entry.clone(), &());
1980 self.entries_by_id.insert_or_replace(
1981 PathEntry {
1982 id: entry.id,
1983 path: entry.path.clone(),
1984 is_ignored: entry.is_ignored,
1985 scan_id: self.scan_id,
1986 },
1987 &(),
1988 );
1989 entry
1990 }
1991
1992 fn populate_dir(
1993 &mut self,
1994 parent_path: Arc<Path>,
1995 entries: impl IntoIterator<Item = Entry>,
1996 ignore: Option<Arc<Gitignore>>,
1997 ) {
1998 let mut parent_entry = self
1999 .entries_by_path
2000 .get(&PathKey(parent_path.clone()), &())
2001 .unwrap()
2002 .clone();
2003 if let Some(ignore) = ignore {
2004 self.ignores.insert(parent_path, (ignore, self.scan_id));
2005 }
2006 if matches!(parent_entry.kind, EntryKind::PendingDir) {
2007 parent_entry.kind = EntryKind::Dir;
2008 } else {
2009 unreachable!();
2010 }
2011
2012 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
2013 let mut entries_by_id_edits = Vec::new();
2014
2015 for mut entry in entries {
2016 self.reuse_entry_id(&mut entry);
2017 entries_by_id_edits.push(Edit::Insert(PathEntry {
2018 id: entry.id,
2019 path: entry.path.clone(),
2020 is_ignored: entry.is_ignored,
2021 scan_id: self.scan_id,
2022 }));
2023 entries_by_path_edits.push(Edit::Insert(entry));
2024 }
2025
2026 self.entries_by_path.edit(entries_by_path_edits, &());
2027 self.entries_by_id.edit(entries_by_id_edits, &());
2028 }
2029
2030 fn reuse_entry_id(&mut self, entry: &mut Entry) {
2031 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
2032 entry.id = removed_entry_id;
2033 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
2034 entry.id = existing_entry.id;
2035 }
2036 }
2037
2038 fn remove_path(&mut self, path: &Path) {
2039 let mut new_entries;
2040 let removed_entries;
2041 {
2042 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
2043 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
2044 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
2045 new_entries.push_tree(cursor.suffix(&()), &());
2046 }
2047 self.entries_by_path = new_entries;
2048
2049 let mut entries_by_id_edits = Vec::new();
2050 for entry in removed_entries.cursor::<()>() {
2051 let removed_entry_id = self
2052 .removed_entry_ids
2053 .entry(entry.inode)
2054 .or_insert(entry.id);
2055 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
2056 entries_by_id_edits.push(Edit::Remove(entry.id));
2057 }
2058 self.entries_by_id.edit(entries_by_id_edits, &());
2059
2060 if path.file_name() == Some(&GITIGNORE) {
2061 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
2062 *scan_id = self.scan_id;
2063 }
2064 }
2065 }
2066
2067 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
2068 let mut new_ignores = Vec::new();
2069 for ancestor in path.ancestors().skip(1) {
2070 if let Some((ignore, _)) = self.ignores.get(ancestor) {
2071 new_ignores.push((ancestor, Some(ignore.clone())));
2072 } else {
2073 new_ignores.push((ancestor, None));
2074 }
2075 }
2076
2077 let mut ignore_stack = IgnoreStack::none();
2078 for (parent_path, ignore) in new_ignores.into_iter().rev() {
2079 if ignore_stack.is_path_ignored(&parent_path, true) {
2080 ignore_stack = IgnoreStack::all();
2081 break;
2082 } else if let Some(ignore) = ignore {
2083 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
2084 }
2085 }
2086
2087 if ignore_stack.is_path_ignored(path, is_dir) {
2088 ignore_stack = IgnoreStack::all();
2089 }
2090
2091 ignore_stack
2092 }
2093}
2094
2095impl fmt::Debug for Snapshot {
2096 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2097 for entry in self.entries_by_path.cursor::<()>() {
2098 for _ in entry.path.ancestors().skip(1) {
2099 write!(f, " ")?;
2100 }
2101 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
2102 }
2103 Ok(())
2104 }
2105}
2106
2107#[derive(Clone, PartialEq)]
2108pub struct File {
2109 entry_id: Option<usize>,
2110 worktree: ModelHandle<Worktree>,
2111 worktree_path: Arc<Path>,
2112 pub path: Arc<Path>,
2113 pub mtime: SystemTime,
2114 is_local: bool,
2115}
2116
2117impl language::File for File {
2118 fn worktree_id(&self) -> usize {
2119 self.worktree.id()
2120 }
2121
2122 fn entry_id(&self) -> Option<usize> {
2123 self.entry_id
2124 }
2125
2126 fn mtime(&self) -> SystemTime {
2127 self.mtime
2128 }
2129
2130 fn path(&self) -> &Arc<Path> {
2131 &self.path
2132 }
2133
2134 fn abs_path(&self) -> Option<PathBuf> {
2135 if self.is_local {
2136 Some(self.worktree_path.join(&self.path))
2137 } else {
2138 None
2139 }
2140 }
2141
2142 fn full_path(&self) -> PathBuf {
2143 let mut full_path = PathBuf::new();
2144 if let Some(worktree_name) = self.worktree_path.file_name() {
2145 full_path.push(worktree_name);
2146 }
2147 full_path.push(&self.path);
2148 full_path
2149 }
2150
2151 /// Returns the last component of this handle's absolute path. If this handle refers to the root
2152 /// of its worktree, then this method will return the name of the worktree itself.
2153 fn file_name<'a>(&'a self) -> Option<OsString> {
2154 self.path
2155 .file_name()
2156 .or_else(|| self.worktree_path.file_name())
2157 .map(Into::into)
2158 }
2159
2160 fn is_deleted(&self) -> bool {
2161 self.entry_id.is_none()
2162 }
2163
2164 fn save(
2165 &self,
2166 buffer_id: u64,
2167 text: Rope,
2168 version: clock::Global,
2169 cx: &mut MutableAppContext,
2170 ) -> Task<Result<(clock::Global, SystemTime)>> {
2171 self.worktree.update(cx, |worktree, cx| match worktree {
2172 Worktree::Local(worktree) => {
2173 let rpc = worktree.client.clone();
2174 let worktree_id = *worktree.remote_id.borrow();
2175 let save = worktree.save(self.path.clone(), text, cx);
2176 cx.background().spawn(async move {
2177 let entry = save.await?;
2178 if let Some(worktree_id) = worktree_id {
2179 rpc.send(proto::BufferSaved {
2180 worktree_id,
2181 buffer_id,
2182 version: (&version).into(),
2183 mtime: Some(entry.mtime.into()),
2184 })
2185 .await?;
2186 }
2187 Ok((version, entry.mtime))
2188 })
2189 }
2190 Worktree::Remote(worktree) => {
2191 let rpc = worktree.client.clone();
2192 let worktree_id = worktree.remote_id;
2193 cx.foreground().spawn(async move {
2194 let response = rpc
2195 .request(proto::SaveBuffer {
2196 worktree_id,
2197 buffer_id,
2198 })
2199 .await?;
2200 let version = response.version.try_into()?;
2201 let mtime = response
2202 .mtime
2203 .ok_or_else(|| anyhow!("missing mtime"))?
2204 .into();
2205 Ok((version, mtime))
2206 })
2207 }
2208 })
2209 }
2210
2211 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
2212 let worktree = self.worktree.read(cx).as_local()?;
2213 let abs_path = worktree.absolutize(&self.path);
2214 let fs = worktree.fs.clone();
2215 Some(
2216 cx.background()
2217 .spawn(async move { fs.load(&abs_path).await }),
2218 )
2219 }
2220
2221 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2222 self.worktree.update(cx, |worktree, cx| {
2223 worktree.send_buffer_update(buffer_id, operation, cx);
2224 });
2225 }
2226
2227 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2228 self.worktree.update(cx, |worktree, cx| {
2229 if let Worktree::Remote(worktree) = worktree {
2230 let worktree_id = worktree.remote_id;
2231 let rpc = worktree.client.clone();
2232 cx.background()
2233 .spawn(async move {
2234 if let Err(error) = rpc
2235 .send(proto::CloseBuffer {
2236 worktree_id,
2237 buffer_id,
2238 })
2239 .await
2240 {
2241 log::error!("error closing remote buffer: {}", error);
2242 }
2243 })
2244 .detach();
2245 }
2246 });
2247 }
2248
2249 fn boxed_clone(&self) -> Box<dyn language::File> {
2250 Box::new(self.clone())
2251 }
2252
2253 fn as_any(&self) -> &dyn Any {
2254 self
2255 }
2256}
2257
2258#[derive(Clone, Debug)]
2259pub struct Entry {
2260 pub id: usize,
2261 pub kind: EntryKind,
2262 pub path: Arc<Path>,
2263 pub inode: u64,
2264 pub mtime: SystemTime,
2265 pub is_symlink: bool,
2266 pub is_ignored: bool,
2267}
2268
2269#[derive(Clone, Debug)]
2270pub enum EntryKind {
2271 PendingDir,
2272 Dir,
2273 File(CharBag),
2274}
2275
2276impl Entry {
2277 fn new(
2278 path: Arc<Path>,
2279 metadata: &fs::Metadata,
2280 next_entry_id: &AtomicUsize,
2281 root_char_bag: CharBag,
2282 ) -> Self {
2283 Self {
2284 id: next_entry_id.fetch_add(1, SeqCst),
2285 kind: if metadata.is_dir {
2286 EntryKind::PendingDir
2287 } else {
2288 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2289 },
2290 path,
2291 inode: metadata.inode,
2292 mtime: metadata.mtime,
2293 is_symlink: metadata.is_symlink,
2294 is_ignored: false,
2295 }
2296 }
2297
2298 pub fn is_dir(&self) -> bool {
2299 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2300 }
2301
2302 pub fn is_file(&self) -> bool {
2303 matches!(self.kind, EntryKind::File(_))
2304 }
2305}
2306
2307impl sum_tree::Item for Entry {
2308 type Summary = EntrySummary;
2309
2310 fn summary(&self) -> Self::Summary {
2311 let visible_count = if self.is_ignored { 0 } else { 1 };
2312 let file_count;
2313 let visible_file_count;
2314 if self.is_file() {
2315 file_count = 1;
2316 visible_file_count = visible_count;
2317 } else {
2318 file_count = 0;
2319 visible_file_count = 0;
2320 }
2321
2322 EntrySummary {
2323 max_path: self.path.clone(),
2324 count: 1,
2325 visible_count,
2326 file_count,
2327 visible_file_count,
2328 }
2329 }
2330}
2331
2332impl sum_tree::KeyedItem for Entry {
2333 type Key = PathKey;
2334
2335 fn key(&self) -> Self::Key {
2336 PathKey(self.path.clone())
2337 }
2338}
2339
2340#[derive(Clone, Debug)]
2341pub struct EntrySummary {
2342 max_path: Arc<Path>,
2343 count: usize,
2344 visible_count: usize,
2345 file_count: usize,
2346 visible_file_count: usize,
2347}
2348
2349impl Default for EntrySummary {
2350 fn default() -> Self {
2351 Self {
2352 max_path: Arc::from(Path::new("")),
2353 count: 0,
2354 visible_count: 0,
2355 file_count: 0,
2356 visible_file_count: 0,
2357 }
2358 }
2359}
2360
2361impl sum_tree::Summary for EntrySummary {
2362 type Context = ();
2363
2364 fn add_summary(&mut self, rhs: &Self, _: &()) {
2365 self.max_path = rhs.max_path.clone();
2366 self.visible_count += rhs.visible_count;
2367 self.file_count += rhs.file_count;
2368 self.visible_file_count += rhs.visible_file_count;
2369 }
2370}
2371
2372#[derive(Clone, Debug)]
2373struct PathEntry {
2374 id: usize,
2375 path: Arc<Path>,
2376 is_ignored: bool,
2377 scan_id: usize,
2378}
2379
2380impl sum_tree::Item for PathEntry {
2381 type Summary = PathEntrySummary;
2382
2383 fn summary(&self) -> Self::Summary {
2384 PathEntrySummary { max_id: self.id }
2385 }
2386}
2387
2388impl sum_tree::KeyedItem for PathEntry {
2389 type Key = usize;
2390
2391 fn key(&self) -> Self::Key {
2392 self.id
2393 }
2394}
2395
2396#[derive(Clone, Debug, Default)]
2397struct PathEntrySummary {
2398 max_id: usize,
2399}
2400
2401impl sum_tree::Summary for PathEntrySummary {
2402 type Context = ();
2403
2404 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2405 self.max_id = summary.max_id;
2406 }
2407}
2408
2409impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2410 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2411 *self = summary.max_id;
2412 }
2413}
2414
2415#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2416pub struct PathKey(Arc<Path>);
2417
2418impl Default for PathKey {
2419 fn default() -> Self {
2420 Self(Path::new("").into())
2421 }
2422}
2423
2424impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2425 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2426 self.0 = summary.max_path.clone();
2427 }
2428}
2429
2430struct BackgroundScanner {
2431 fs: Arc<dyn Fs>,
2432 snapshot: Arc<Mutex<Snapshot>>,
2433 notify: Sender<ScanState>,
2434 executor: Arc<executor::Background>,
2435}
2436
2437impl BackgroundScanner {
2438 fn new(
2439 snapshot: Arc<Mutex<Snapshot>>,
2440 notify: Sender<ScanState>,
2441 fs: Arc<dyn Fs>,
2442 executor: Arc<executor::Background>,
2443 ) -> Self {
2444 Self {
2445 fs,
2446 snapshot,
2447 notify,
2448 executor,
2449 }
2450 }
2451
2452 fn abs_path(&self) -> Arc<Path> {
2453 self.snapshot.lock().abs_path.clone()
2454 }
2455
2456 fn snapshot(&self) -> Snapshot {
2457 self.snapshot.lock().clone()
2458 }
2459
2460 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2461 if self.notify.send(ScanState::Scanning).await.is_err() {
2462 return;
2463 }
2464
2465 if let Err(err) = self.scan_dirs().await {
2466 if self
2467 .notify
2468 .send(ScanState::Err(Arc::new(err)))
2469 .await
2470 .is_err()
2471 {
2472 return;
2473 }
2474 }
2475
2476 if self.notify.send(ScanState::Idle).await.is_err() {
2477 return;
2478 }
2479
2480 futures::pin_mut!(events_rx);
2481 while let Some(events) = events_rx.next().await {
2482 if self.notify.send(ScanState::Scanning).await.is_err() {
2483 break;
2484 }
2485
2486 if !self.process_events(events).await {
2487 break;
2488 }
2489
2490 if self.notify.send(ScanState::Idle).await.is_err() {
2491 break;
2492 }
2493 }
2494 }
2495
2496 async fn scan_dirs(&mut self) -> Result<()> {
2497 let root_char_bag;
2498 let next_entry_id;
2499 let is_dir;
2500 {
2501 let snapshot = self.snapshot.lock();
2502 root_char_bag = snapshot.root_char_bag;
2503 next_entry_id = snapshot.next_entry_id.clone();
2504 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2505 };
2506
2507 if is_dir {
2508 let path: Arc<Path> = Arc::from(Path::new(""));
2509 let abs_path = self.abs_path();
2510 let (tx, rx) = channel::unbounded();
2511 tx.send(ScanJob {
2512 abs_path: abs_path.to_path_buf(),
2513 path,
2514 ignore_stack: IgnoreStack::none(),
2515 scan_queue: tx.clone(),
2516 })
2517 .await
2518 .unwrap();
2519 drop(tx);
2520
2521 self.executor
2522 .scoped(|scope| {
2523 for _ in 0..self.executor.num_cpus() {
2524 scope.spawn(async {
2525 while let Ok(job) = rx.recv().await {
2526 if let Err(err) = self
2527 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2528 .await
2529 {
2530 log::error!("error scanning {:?}: {}", job.abs_path, err);
2531 }
2532 }
2533 });
2534 }
2535 })
2536 .await;
2537 }
2538
2539 Ok(())
2540 }
2541
2542 async fn scan_dir(
2543 &self,
2544 root_char_bag: CharBag,
2545 next_entry_id: Arc<AtomicUsize>,
2546 job: &ScanJob,
2547 ) -> Result<()> {
2548 let mut new_entries: Vec<Entry> = Vec::new();
2549 let mut new_jobs: Vec<ScanJob> = Vec::new();
2550 let mut ignore_stack = job.ignore_stack.clone();
2551 let mut new_ignore = None;
2552
2553 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2554 while let Some(child_abs_path) = child_paths.next().await {
2555 let child_abs_path = match child_abs_path {
2556 Ok(child_abs_path) => child_abs_path,
2557 Err(error) => {
2558 log::error!("error processing entry {:?}", error);
2559 continue;
2560 }
2561 };
2562 let child_name = child_abs_path.file_name().unwrap();
2563 let child_path: Arc<Path> = job.path.join(child_name).into();
2564 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2565 Some(metadata) => metadata,
2566 None => continue,
2567 };
2568
2569 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2570 if child_name == *GITIGNORE {
2571 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2572 Ok(ignore) => {
2573 let ignore = Arc::new(ignore);
2574 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2575 new_ignore = Some(ignore);
2576 }
2577 Err(error) => {
2578 log::error!(
2579 "error loading .gitignore file {:?} - {:?}",
2580 child_name,
2581 error
2582 );
2583 }
2584 }
2585
2586 // Update ignore status of any child entries we've already processed to reflect the
2587 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2588 // there should rarely be too numerous. Update the ignore stack associated with any
2589 // new jobs as well.
2590 let mut new_jobs = new_jobs.iter_mut();
2591 for entry in &mut new_entries {
2592 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2593 if entry.is_dir() {
2594 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2595 IgnoreStack::all()
2596 } else {
2597 ignore_stack.clone()
2598 };
2599 }
2600 }
2601 }
2602
2603 let mut child_entry = Entry::new(
2604 child_path.clone(),
2605 &child_metadata,
2606 &next_entry_id,
2607 root_char_bag,
2608 );
2609
2610 if child_metadata.is_dir {
2611 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2612 child_entry.is_ignored = is_ignored;
2613 new_entries.push(child_entry);
2614 new_jobs.push(ScanJob {
2615 abs_path: child_abs_path,
2616 path: child_path,
2617 ignore_stack: if is_ignored {
2618 IgnoreStack::all()
2619 } else {
2620 ignore_stack.clone()
2621 },
2622 scan_queue: job.scan_queue.clone(),
2623 });
2624 } else {
2625 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2626 new_entries.push(child_entry);
2627 };
2628 }
2629
2630 self.snapshot
2631 .lock()
2632 .populate_dir(job.path.clone(), new_entries, new_ignore);
2633 for new_job in new_jobs {
2634 job.scan_queue.send(new_job).await.unwrap();
2635 }
2636
2637 Ok(())
2638 }
2639
2640 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2641 let mut snapshot = self.snapshot();
2642 snapshot.scan_id += 1;
2643
2644 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2645 abs_path
2646 } else {
2647 return false;
2648 };
2649 let root_char_bag = snapshot.root_char_bag;
2650 let next_entry_id = snapshot.next_entry_id.clone();
2651
2652 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2653 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2654
2655 for event in &events {
2656 match event.path.strip_prefix(&root_abs_path) {
2657 Ok(path) => snapshot.remove_path(&path),
2658 Err(_) => {
2659 log::error!(
2660 "unexpected event {:?} for root path {:?}",
2661 event.path,
2662 root_abs_path
2663 );
2664 continue;
2665 }
2666 }
2667 }
2668
2669 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2670 for event in events {
2671 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2672 Ok(path) => Arc::from(path.to_path_buf()),
2673 Err(_) => {
2674 log::error!(
2675 "unexpected event {:?} for root path {:?}",
2676 event.path,
2677 root_abs_path
2678 );
2679 continue;
2680 }
2681 };
2682
2683 match self.fs.metadata(&event.path).await {
2684 Ok(Some(metadata)) => {
2685 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2686 let mut fs_entry = Entry::new(
2687 path.clone(),
2688 &metadata,
2689 snapshot.next_entry_id.as_ref(),
2690 snapshot.root_char_bag,
2691 );
2692 fs_entry.is_ignored = ignore_stack.is_all();
2693 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2694 if metadata.is_dir {
2695 scan_queue_tx
2696 .send(ScanJob {
2697 abs_path: event.path,
2698 path,
2699 ignore_stack,
2700 scan_queue: scan_queue_tx.clone(),
2701 })
2702 .await
2703 .unwrap();
2704 }
2705 }
2706 Ok(None) => {}
2707 Err(err) => {
2708 // TODO - create a special 'error' entry in the entries tree to mark this
2709 log::error!("error reading file on event {:?}", err);
2710 }
2711 }
2712 }
2713
2714 *self.snapshot.lock() = snapshot;
2715
2716 // Scan any directories that were created as part of this event batch.
2717 drop(scan_queue_tx);
2718 self.executor
2719 .scoped(|scope| {
2720 for _ in 0..self.executor.num_cpus() {
2721 scope.spawn(async {
2722 while let Ok(job) = scan_queue_rx.recv().await {
2723 if let Err(err) = self
2724 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2725 .await
2726 {
2727 log::error!("error scanning {:?}: {}", job.abs_path, err);
2728 }
2729 }
2730 });
2731 }
2732 })
2733 .await;
2734
2735 // Attempt to detect renames only over a single batch of file-system events.
2736 self.snapshot.lock().removed_entry_ids.clear();
2737
2738 self.update_ignore_statuses().await;
2739 true
2740 }
2741
2742 async fn update_ignore_statuses(&self) {
2743 let mut snapshot = self.snapshot();
2744
2745 let mut ignores_to_update = Vec::new();
2746 let mut ignores_to_delete = Vec::new();
2747 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2748 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2749 ignores_to_update.push(parent_path.clone());
2750 }
2751
2752 let ignore_path = parent_path.join(&*GITIGNORE);
2753 if snapshot.entry_for_path(ignore_path).is_none() {
2754 ignores_to_delete.push(parent_path.clone());
2755 }
2756 }
2757
2758 for parent_path in ignores_to_delete {
2759 snapshot.ignores.remove(&parent_path);
2760 self.snapshot.lock().ignores.remove(&parent_path);
2761 }
2762
2763 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2764 ignores_to_update.sort_unstable();
2765 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2766 while let Some(parent_path) = ignores_to_update.next() {
2767 while ignores_to_update
2768 .peek()
2769 .map_or(false, |p| p.starts_with(&parent_path))
2770 {
2771 ignores_to_update.next().unwrap();
2772 }
2773
2774 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2775 ignore_queue_tx
2776 .send(UpdateIgnoreStatusJob {
2777 path: parent_path,
2778 ignore_stack,
2779 ignore_queue: ignore_queue_tx.clone(),
2780 })
2781 .await
2782 .unwrap();
2783 }
2784 drop(ignore_queue_tx);
2785
2786 self.executor
2787 .scoped(|scope| {
2788 for _ in 0..self.executor.num_cpus() {
2789 scope.spawn(async {
2790 while let Ok(job) = ignore_queue_rx.recv().await {
2791 self.update_ignore_status(job, &snapshot).await;
2792 }
2793 });
2794 }
2795 })
2796 .await;
2797 }
2798
2799 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2800 let mut ignore_stack = job.ignore_stack;
2801 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2802 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2803 }
2804
2805 let mut entries_by_id_edits = Vec::new();
2806 let mut entries_by_path_edits = Vec::new();
2807 for mut entry in snapshot.child_entries(&job.path).cloned() {
2808 let was_ignored = entry.is_ignored;
2809 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2810 if entry.is_dir() {
2811 let child_ignore_stack = if entry.is_ignored {
2812 IgnoreStack::all()
2813 } else {
2814 ignore_stack.clone()
2815 };
2816 job.ignore_queue
2817 .send(UpdateIgnoreStatusJob {
2818 path: entry.path.clone(),
2819 ignore_stack: child_ignore_stack,
2820 ignore_queue: job.ignore_queue.clone(),
2821 })
2822 .await
2823 .unwrap();
2824 }
2825
2826 if entry.is_ignored != was_ignored {
2827 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2828 path_entry.scan_id = snapshot.scan_id;
2829 path_entry.is_ignored = entry.is_ignored;
2830 entries_by_id_edits.push(Edit::Insert(path_entry));
2831 entries_by_path_edits.push(Edit::Insert(entry));
2832 }
2833 }
2834
2835 let mut snapshot = self.snapshot.lock();
2836 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2837 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2838 }
2839}
2840
2841async fn refresh_entry(
2842 fs: &dyn Fs,
2843 snapshot: &Mutex<Snapshot>,
2844 path: Arc<Path>,
2845 abs_path: &Path,
2846) -> Result<Entry> {
2847 let root_char_bag;
2848 let next_entry_id;
2849 {
2850 let snapshot = snapshot.lock();
2851 root_char_bag = snapshot.root_char_bag;
2852 next_entry_id = snapshot.next_entry_id.clone();
2853 }
2854 let entry = Entry::new(
2855 path,
2856 &fs.metadata(abs_path)
2857 .await?
2858 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2859 &next_entry_id,
2860 root_char_bag,
2861 );
2862 Ok(snapshot.lock().insert_entry(entry, fs))
2863}
2864
2865fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2866 let mut result = root_char_bag;
2867 result.extend(
2868 path.to_string_lossy()
2869 .chars()
2870 .map(|c| c.to_ascii_lowercase()),
2871 );
2872 result
2873}
2874
2875struct ScanJob {
2876 abs_path: PathBuf,
2877 path: Arc<Path>,
2878 ignore_stack: Arc<IgnoreStack>,
2879 scan_queue: Sender<ScanJob>,
2880}
2881
2882struct UpdateIgnoreStatusJob {
2883 path: Arc<Path>,
2884 ignore_stack: Arc<IgnoreStack>,
2885 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2886}
2887
2888pub trait WorktreeHandle {
2889 #[cfg(test)]
2890 fn flush_fs_events<'a>(
2891 &self,
2892 cx: &'a gpui::TestAppContext,
2893 ) -> futures::future::LocalBoxFuture<'a, ()>;
2894}
2895
2896impl WorktreeHandle for ModelHandle<Worktree> {
2897 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2898 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2899 // extra directory scans, and emit extra scan-state notifications.
2900 //
2901 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2902 // to ensure that all redundant FS events have already been processed.
2903 #[cfg(test)]
2904 fn flush_fs_events<'a>(
2905 &self,
2906 cx: &'a gpui::TestAppContext,
2907 ) -> futures::future::LocalBoxFuture<'a, ()> {
2908 use smol::future::FutureExt;
2909
2910 let filename = "fs-event-sentinel";
2911 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2912 let tree = self.clone();
2913 async move {
2914 std::fs::write(root_path.join(filename), "").unwrap();
2915 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2916 .await;
2917
2918 std::fs::remove_file(root_path.join(filename)).unwrap();
2919 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2920 .await;
2921
2922 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2923 .await;
2924 }
2925 .boxed_local()
2926 }
2927}
2928
2929#[derive(Clone, Debug)]
2930struct TraversalProgress<'a> {
2931 max_path: &'a Path,
2932 count: usize,
2933 visible_count: usize,
2934 file_count: usize,
2935 visible_file_count: usize,
2936}
2937
2938impl<'a> TraversalProgress<'a> {
2939 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2940 match (include_ignored, include_dirs) {
2941 (true, true) => self.count,
2942 (true, false) => self.file_count,
2943 (false, true) => self.visible_count,
2944 (false, false) => self.visible_file_count,
2945 }
2946 }
2947}
2948
2949impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2950 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2951 self.max_path = summary.max_path.as_ref();
2952 self.count += summary.count;
2953 self.visible_count += summary.visible_count;
2954 self.file_count += summary.file_count;
2955 self.visible_file_count += summary.visible_file_count;
2956 }
2957}
2958
2959impl<'a> Default for TraversalProgress<'a> {
2960 fn default() -> Self {
2961 Self {
2962 max_path: Path::new(""),
2963 count: 0,
2964 visible_count: 0,
2965 file_count: 0,
2966 visible_file_count: 0,
2967 }
2968 }
2969}
2970
2971pub struct Traversal<'a> {
2972 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2973 include_ignored: bool,
2974 include_dirs: bool,
2975}
2976
2977impl<'a> Traversal<'a> {
2978 pub fn advance(&mut self) -> bool {
2979 self.advance_to_offset(self.offset() + 1)
2980 }
2981
2982 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2983 self.cursor.seek_forward(
2984 &TraversalTarget::Count {
2985 count: offset,
2986 include_dirs: self.include_dirs,
2987 include_ignored: self.include_ignored,
2988 },
2989 Bias::Right,
2990 &(),
2991 )
2992 }
2993
2994 pub fn advance_to_sibling(&mut self) -> bool {
2995 while let Some(entry) = self.cursor.item() {
2996 self.cursor.seek_forward(
2997 &TraversalTarget::PathSuccessor(&entry.path),
2998 Bias::Left,
2999 &(),
3000 );
3001 if let Some(entry) = self.cursor.item() {
3002 if (self.include_dirs || !entry.is_dir())
3003 && (self.include_ignored || !entry.is_ignored)
3004 {
3005 return true;
3006 }
3007 }
3008 }
3009 false
3010 }
3011
3012 pub fn entry(&self) -> Option<&'a Entry> {
3013 self.cursor.item()
3014 }
3015
3016 pub fn offset(&self) -> usize {
3017 self.cursor
3018 .start()
3019 .count(self.include_dirs, self.include_ignored)
3020 }
3021}
3022
3023impl<'a> Iterator for Traversal<'a> {
3024 type Item = &'a Entry;
3025
3026 fn next(&mut self) -> Option<Self::Item> {
3027 if let Some(item) = self.entry() {
3028 self.advance();
3029 Some(item)
3030 } else {
3031 None
3032 }
3033 }
3034}
3035
3036#[derive(Debug)]
3037enum TraversalTarget<'a> {
3038 Path(&'a Path),
3039 PathSuccessor(&'a Path),
3040 Count {
3041 count: usize,
3042 include_ignored: bool,
3043 include_dirs: bool,
3044 },
3045}
3046
3047impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
3048 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
3049 match self {
3050 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
3051 TraversalTarget::PathSuccessor(path) => {
3052 if !cursor_location.max_path.starts_with(path) {
3053 Ordering::Equal
3054 } else {
3055 Ordering::Greater
3056 }
3057 }
3058 TraversalTarget::Count {
3059 count,
3060 include_dirs,
3061 include_ignored,
3062 } => Ord::cmp(
3063 count,
3064 &cursor_location.count(*include_dirs, *include_ignored),
3065 ),
3066 }
3067 }
3068}
3069
3070struct ChildEntriesIter<'a> {
3071 parent_path: &'a Path,
3072 traversal: Traversal<'a>,
3073}
3074
3075impl<'a> Iterator for ChildEntriesIter<'a> {
3076 type Item = &'a Entry;
3077
3078 fn next(&mut self) -> Option<Self::Item> {
3079 if let Some(item) = self.traversal.entry() {
3080 if item.path.starts_with(&self.parent_path) {
3081 self.traversal.advance_to_sibling();
3082 return Some(item);
3083 }
3084 }
3085 None
3086 }
3087}
3088
3089impl<'a> From<&'a Entry> for proto::Entry {
3090 fn from(entry: &'a Entry) -> Self {
3091 Self {
3092 id: entry.id as u64,
3093 is_dir: entry.is_dir(),
3094 path: entry.path.to_string_lossy().to_string(),
3095 inode: entry.inode,
3096 mtime: Some(entry.mtime.into()),
3097 is_symlink: entry.is_symlink,
3098 is_ignored: entry.is_ignored,
3099 }
3100 }
3101}
3102
3103impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
3104 type Error = anyhow::Error;
3105
3106 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
3107 if let Some(mtime) = entry.mtime {
3108 let kind = if entry.is_dir {
3109 EntryKind::Dir
3110 } else {
3111 let mut char_bag = root_char_bag.clone();
3112 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
3113 EntryKind::File(char_bag)
3114 };
3115 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
3116 Ok(Entry {
3117 id: entry.id as usize,
3118 kind,
3119 path: path.clone(),
3120 inode: entry.inode,
3121 mtime: mtime.into(),
3122 is_symlink: entry.is_symlink,
3123 is_ignored: entry.is_ignored,
3124 })
3125 } else {
3126 Err(anyhow!(
3127 "missing mtime in remote worktree entry {:?}",
3128 entry.path
3129 ))
3130 }
3131 }
3132}
3133
3134trait ToPointUtf16 {
3135 fn to_point_utf16(self) -> PointUtf16;
3136}
3137
3138impl ToPointUtf16 for lsp::Position {
3139 fn to_point_utf16(self) -> PointUtf16 {
3140 PointUtf16::new(self.line, self.character)
3141 }
3142}
3143
3144fn diagnostic_ranges<'a>(
3145 diagnostic: &'a lsp::Diagnostic,
3146 abs_path: &'a Path,
3147) -> impl 'a + Iterator<Item = Range<PointUtf16>> {
3148 diagnostic
3149 .related_information
3150 .iter()
3151 .flatten()
3152 .filter_map(move |info| {
3153 if info.location.uri.to_file_path().ok()? == abs_path {
3154 let info_start = PointUtf16::new(
3155 info.location.range.start.line,
3156 info.location.range.start.character,
3157 );
3158 let info_end = PointUtf16::new(
3159 info.location.range.end.line,
3160 info.location.range.end.character,
3161 );
3162 Some(info_start..info_end)
3163 } else {
3164 None
3165 }
3166 })
3167 .chain(Some(
3168 diagnostic.range.start.to_point_utf16()..diagnostic.range.end.to_point_utf16(),
3169 ))
3170}
3171
3172#[cfg(test)]
3173mod tests {
3174 use super::*;
3175 use crate::fs::FakeFs;
3176 use anyhow::Result;
3177 use client::test::{FakeHttpClient, FakeServer};
3178 use fs::RealFs;
3179 use language::{tree_sitter_rust, DiagnosticEntry, LanguageServerConfig};
3180 use language::{Diagnostic, LanguageConfig};
3181 use lsp::Url;
3182 use rand::prelude::*;
3183 use serde_json::json;
3184 use std::{cell::RefCell, rc::Rc};
3185 use std::{
3186 env,
3187 fmt::Write,
3188 time::{SystemTime, UNIX_EPOCH},
3189 };
3190 use text::Point;
3191 use unindent::Unindent as _;
3192 use util::test::temp_tree;
3193
3194 #[gpui::test]
3195 async fn test_traversal(mut cx: gpui::TestAppContext) {
3196 let fs = FakeFs::new();
3197 fs.insert_tree(
3198 "/root",
3199 json!({
3200 ".gitignore": "a/b\n",
3201 "a": {
3202 "b": "",
3203 "c": "",
3204 }
3205 }),
3206 )
3207 .await;
3208
3209 let client = Client::new();
3210 let http_client = FakeHttpClient::with_404_response();
3211 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3212
3213 let tree = Worktree::open_local(
3214 client,
3215 user_store,
3216 Arc::from(Path::new("/root")),
3217 Arc::new(fs),
3218 Default::default(),
3219 &mut cx.to_async(),
3220 )
3221 .await
3222 .unwrap();
3223 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3224 .await;
3225
3226 tree.read_with(&cx, |tree, _| {
3227 assert_eq!(
3228 tree.entries(false)
3229 .map(|entry| entry.path.as_ref())
3230 .collect::<Vec<_>>(),
3231 vec![
3232 Path::new(""),
3233 Path::new(".gitignore"),
3234 Path::new("a"),
3235 Path::new("a/c"),
3236 ]
3237 );
3238 })
3239 }
3240
3241 #[gpui::test]
3242 async fn test_save_file(mut cx: gpui::TestAppContext) {
3243 let dir = temp_tree(json!({
3244 "file1": "the old contents",
3245 }));
3246
3247 let client = Client::new();
3248 let http_client = FakeHttpClient::with_404_response();
3249 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3250
3251 let tree = Worktree::open_local(
3252 client,
3253 user_store,
3254 dir.path(),
3255 Arc::new(RealFs),
3256 Default::default(),
3257 &mut cx.to_async(),
3258 )
3259 .await
3260 .unwrap();
3261 let buffer = tree
3262 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3263 .await
3264 .unwrap();
3265 let save = buffer.update(&mut cx, |buffer, cx| {
3266 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3267 buffer.save(cx).unwrap()
3268 });
3269 save.await.unwrap();
3270
3271 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3272 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3273 }
3274
3275 #[gpui::test]
3276 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3277 let dir = temp_tree(json!({
3278 "file1": "the old contents",
3279 }));
3280 let file_path = dir.path().join("file1");
3281
3282 let client = Client::new();
3283 let http_client = FakeHttpClient::with_404_response();
3284 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3285
3286 let tree = Worktree::open_local(
3287 client,
3288 user_store,
3289 file_path.clone(),
3290 Arc::new(RealFs),
3291 Default::default(),
3292 &mut cx.to_async(),
3293 )
3294 .await
3295 .unwrap();
3296 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3297 .await;
3298 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3299
3300 let buffer = tree
3301 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3302 .await
3303 .unwrap();
3304 let save = buffer.update(&mut cx, |buffer, cx| {
3305 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3306 buffer.save(cx).unwrap()
3307 });
3308 save.await.unwrap();
3309
3310 let new_text = std::fs::read_to_string(file_path).unwrap();
3311 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3312 }
3313
3314 #[gpui::test]
3315 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3316 let dir = temp_tree(json!({
3317 "a": {
3318 "file1": "",
3319 "file2": "",
3320 "file3": "",
3321 },
3322 "b": {
3323 "c": {
3324 "file4": "",
3325 "file5": "",
3326 }
3327 }
3328 }));
3329
3330 let user_id = 5;
3331 let mut client = Client::new();
3332 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3333 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3334 let tree = Worktree::open_local(
3335 client,
3336 user_store.clone(),
3337 dir.path(),
3338 Arc::new(RealFs),
3339 Default::default(),
3340 &mut cx.to_async(),
3341 )
3342 .await
3343 .unwrap();
3344
3345 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3346 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3347 async move { buffer.await.unwrap() }
3348 };
3349 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3350 tree.read_with(cx, |tree, _| {
3351 tree.entry_for_path(path)
3352 .expect(&format!("no entry for path {}", path))
3353 .id
3354 })
3355 };
3356
3357 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3358 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3359 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3360 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3361
3362 let file2_id = id_for_path("a/file2", &cx);
3363 let file3_id = id_for_path("a/file3", &cx);
3364 let file4_id = id_for_path("b/c/file4", &cx);
3365
3366 // Wait for the initial scan.
3367 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3368 .await;
3369
3370 // Create a remote copy of this worktree.
3371 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3372 let worktree_id = 1;
3373 let share_request = tree.update(&mut cx, |tree, cx| {
3374 tree.as_local().unwrap().share_request(cx)
3375 });
3376 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3377 server
3378 .respond(
3379 open_worktree.receipt(),
3380 proto::OpenWorktreeResponse { worktree_id: 1 },
3381 )
3382 .await;
3383
3384 let remote = Worktree::remote(
3385 proto::JoinWorktreeResponse {
3386 worktree: share_request.await.unwrap().worktree,
3387 replica_id: 1,
3388 collaborators: Vec::new(),
3389 },
3390 Client::new(),
3391 user_store,
3392 Default::default(),
3393 &mut cx.to_async(),
3394 )
3395 .await
3396 .unwrap();
3397
3398 cx.read(|cx| {
3399 assert!(!buffer2.read(cx).is_dirty());
3400 assert!(!buffer3.read(cx).is_dirty());
3401 assert!(!buffer4.read(cx).is_dirty());
3402 assert!(!buffer5.read(cx).is_dirty());
3403 });
3404
3405 // Rename and delete files and directories.
3406 tree.flush_fs_events(&cx).await;
3407 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3408 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3409 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3410 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3411 tree.flush_fs_events(&cx).await;
3412
3413 let expected_paths = vec![
3414 "a",
3415 "a/file1",
3416 "a/file2.new",
3417 "b",
3418 "d",
3419 "d/file3",
3420 "d/file4",
3421 ];
3422
3423 cx.read(|app| {
3424 assert_eq!(
3425 tree.read(app)
3426 .paths()
3427 .map(|p| p.to_str().unwrap())
3428 .collect::<Vec<_>>(),
3429 expected_paths
3430 );
3431
3432 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3433 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3434 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3435
3436 assert_eq!(
3437 buffer2.read(app).file().unwrap().path().as_ref(),
3438 Path::new("a/file2.new")
3439 );
3440 assert_eq!(
3441 buffer3.read(app).file().unwrap().path().as_ref(),
3442 Path::new("d/file3")
3443 );
3444 assert_eq!(
3445 buffer4.read(app).file().unwrap().path().as_ref(),
3446 Path::new("d/file4")
3447 );
3448 assert_eq!(
3449 buffer5.read(app).file().unwrap().path().as_ref(),
3450 Path::new("b/c/file5")
3451 );
3452
3453 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3454 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3455 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3456 assert!(buffer5.read(app).file().unwrap().is_deleted());
3457 });
3458
3459 // Update the remote worktree. Check that it becomes consistent with the
3460 // local worktree.
3461 remote.update(&mut cx, |remote, cx| {
3462 let update_message =
3463 tree.read(cx)
3464 .snapshot()
3465 .build_update(&initial_snapshot, worktree_id, true);
3466 remote
3467 .as_remote_mut()
3468 .unwrap()
3469 .snapshot
3470 .apply_update(update_message)
3471 .unwrap();
3472
3473 assert_eq!(
3474 remote
3475 .paths()
3476 .map(|p| p.to_str().unwrap())
3477 .collect::<Vec<_>>(),
3478 expected_paths
3479 );
3480 });
3481 }
3482
3483 #[gpui::test]
3484 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3485 let dir = temp_tree(json!({
3486 ".git": {},
3487 ".gitignore": "ignored-dir\n",
3488 "tracked-dir": {
3489 "tracked-file1": "tracked contents",
3490 },
3491 "ignored-dir": {
3492 "ignored-file1": "ignored contents",
3493 }
3494 }));
3495
3496 let client = Client::new();
3497 let http_client = FakeHttpClient::with_404_response();
3498 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3499
3500 let tree = Worktree::open_local(
3501 client,
3502 user_store,
3503 dir.path(),
3504 Arc::new(RealFs),
3505 Default::default(),
3506 &mut cx.to_async(),
3507 )
3508 .await
3509 .unwrap();
3510 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3511 .await;
3512 tree.flush_fs_events(&cx).await;
3513 cx.read(|cx| {
3514 let tree = tree.read(cx);
3515 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3516 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3517 assert_eq!(tracked.is_ignored, false);
3518 assert_eq!(ignored.is_ignored, true);
3519 });
3520
3521 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3522 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3523 tree.flush_fs_events(&cx).await;
3524 cx.read(|cx| {
3525 let tree = tree.read(cx);
3526 let dot_git = tree.entry_for_path(".git").unwrap();
3527 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3528 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3529 assert_eq!(tracked.is_ignored, false);
3530 assert_eq!(ignored.is_ignored, true);
3531 assert_eq!(dot_git.is_ignored, true);
3532 });
3533 }
3534
3535 #[gpui::test]
3536 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3537 let user_id = 100;
3538 let mut client = Client::new();
3539 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3540 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3541
3542 let fs = Arc::new(FakeFs::new());
3543 fs.insert_tree(
3544 "/path",
3545 json!({
3546 "to": {
3547 "the-dir": {
3548 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3549 "a.txt": "a-contents",
3550 },
3551 },
3552 }),
3553 )
3554 .await;
3555
3556 let worktree = Worktree::open_local(
3557 client.clone(),
3558 user_store,
3559 "/path/to/the-dir".as_ref(),
3560 fs,
3561 Default::default(),
3562 &mut cx.to_async(),
3563 )
3564 .await
3565 .unwrap();
3566
3567 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3568 assert_eq!(
3569 open_worktree.payload,
3570 proto::OpenWorktree {
3571 root_name: "the-dir".to_string(),
3572 authorized_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3573 }
3574 );
3575
3576 server
3577 .respond(
3578 open_worktree.receipt(),
3579 proto::OpenWorktreeResponse { worktree_id: 5 },
3580 )
3581 .await;
3582 let remote_id = worktree
3583 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3584 .await;
3585 assert_eq!(remote_id, Some(5));
3586
3587 cx.update(move |_| drop(worktree));
3588 server.receive::<proto::CloseWorktree>().await.unwrap();
3589 }
3590
3591 #[gpui::test]
3592 async fn test_buffer_deduping(mut cx: gpui::TestAppContext) {
3593 let user_id = 100;
3594 let mut client = Client::new();
3595 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3596 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3597
3598 let fs = Arc::new(FakeFs::new());
3599 fs.insert_tree(
3600 "/the-dir",
3601 json!({
3602 "a.txt": "a-contents",
3603 "b.txt": "b-contents",
3604 }),
3605 )
3606 .await;
3607
3608 let worktree = Worktree::open_local(
3609 client.clone(),
3610 user_store,
3611 "/the-dir".as_ref(),
3612 fs,
3613 Default::default(),
3614 &mut cx.to_async(),
3615 )
3616 .await
3617 .unwrap();
3618
3619 // Spawn multiple tasks to open paths, repeating some paths.
3620 let (buffer_a_1, buffer_b, buffer_a_2) = worktree.update(&mut cx, |worktree, cx| {
3621 (
3622 worktree.open_buffer("a.txt", cx),
3623 worktree.open_buffer("b.txt", cx),
3624 worktree.open_buffer("a.txt", cx),
3625 )
3626 });
3627
3628 let buffer_a_1 = buffer_a_1.await.unwrap();
3629 let buffer_a_2 = buffer_a_2.await.unwrap();
3630 let buffer_b = buffer_b.await.unwrap();
3631 assert_eq!(buffer_a_1.read_with(&cx, |b, _| b.text()), "a-contents");
3632 assert_eq!(buffer_b.read_with(&cx, |b, _| b.text()), "b-contents");
3633
3634 // There is only one buffer per path.
3635 let buffer_a_id = buffer_a_1.id();
3636 assert_eq!(buffer_a_2.id(), buffer_a_id);
3637
3638 // Open the same path again while it is still open.
3639 drop(buffer_a_1);
3640 let buffer_a_3 = worktree
3641 .update(&mut cx, |worktree, cx| worktree.open_buffer("a.txt", cx))
3642 .await
3643 .unwrap();
3644
3645 // There's still only one buffer per path.
3646 assert_eq!(buffer_a_3.id(), buffer_a_id);
3647 }
3648
3649 #[gpui::test]
3650 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3651 use std::fs;
3652
3653 let dir = temp_tree(json!({
3654 "file1": "abc",
3655 "file2": "def",
3656 "file3": "ghi",
3657 }));
3658 let client = Client::new();
3659 let http_client = FakeHttpClient::with_404_response();
3660 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3661
3662 let tree = Worktree::open_local(
3663 client,
3664 user_store,
3665 dir.path(),
3666 Arc::new(RealFs),
3667 Default::default(),
3668 &mut cx.to_async(),
3669 )
3670 .await
3671 .unwrap();
3672 tree.flush_fs_events(&cx).await;
3673 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3674 .await;
3675
3676 let buffer1 = tree
3677 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3678 .await
3679 .unwrap();
3680 let events = Rc::new(RefCell::new(Vec::new()));
3681
3682 // initially, the buffer isn't dirty.
3683 buffer1.update(&mut cx, |buffer, cx| {
3684 cx.subscribe(&buffer1, {
3685 let events = events.clone();
3686 move |_, _, event, _| events.borrow_mut().push(event.clone())
3687 })
3688 .detach();
3689
3690 assert!(!buffer.is_dirty());
3691 assert!(events.borrow().is_empty());
3692
3693 buffer.edit(vec![1..2], "", cx);
3694 });
3695
3696 // after the first edit, the buffer is dirty, and emits a dirtied event.
3697 buffer1.update(&mut cx, |buffer, cx| {
3698 assert!(buffer.text() == "ac");
3699 assert!(buffer.is_dirty());
3700 assert_eq!(
3701 *events.borrow(),
3702 &[language::Event::Edited, language::Event::Dirtied]
3703 );
3704 events.borrow_mut().clear();
3705 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3706 });
3707
3708 // after saving, the buffer is not dirty, and emits a saved event.
3709 buffer1.update(&mut cx, |buffer, cx| {
3710 assert!(!buffer.is_dirty());
3711 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3712 events.borrow_mut().clear();
3713
3714 buffer.edit(vec![1..1], "B", cx);
3715 buffer.edit(vec![2..2], "D", cx);
3716 });
3717
3718 // after editing again, the buffer is dirty, and emits another dirty event.
3719 buffer1.update(&mut cx, |buffer, cx| {
3720 assert!(buffer.text() == "aBDc");
3721 assert!(buffer.is_dirty());
3722 assert_eq!(
3723 *events.borrow(),
3724 &[
3725 language::Event::Edited,
3726 language::Event::Dirtied,
3727 language::Event::Edited,
3728 ],
3729 );
3730 events.borrow_mut().clear();
3731
3732 // TODO - currently, after restoring the buffer to its
3733 // previously-saved state, the is still considered dirty.
3734 buffer.edit([1..3], "", cx);
3735 assert!(buffer.text() == "ac");
3736 assert!(buffer.is_dirty());
3737 });
3738
3739 assert_eq!(*events.borrow(), &[language::Event::Edited]);
3740
3741 // When a file is deleted, the buffer is considered dirty.
3742 let events = Rc::new(RefCell::new(Vec::new()));
3743 let buffer2 = tree
3744 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3745 .await
3746 .unwrap();
3747 buffer2.update(&mut cx, |_, cx| {
3748 cx.subscribe(&buffer2, {
3749 let events = events.clone();
3750 move |_, _, event, _| events.borrow_mut().push(event.clone())
3751 })
3752 .detach();
3753 });
3754
3755 fs::remove_file(dir.path().join("file2")).unwrap();
3756 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3757 assert_eq!(
3758 *events.borrow(),
3759 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3760 );
3761
3762 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3763 let events = Rc::new(RefCell::new(Vec::new()));
3764 let buffer3 = tree
3765 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3766 .await
3767 .unwrap();
3768 buffer3.update(&mut cx, |_, cx| {
3769 cx.subscribe(&buffer3, {
3770 let events = events.clone();
3771 move |_, _, event, _| events.borrow_mut().push(event.clone())
3772 })
3773 .detach();
3774 });
3775
3776 tree.flush_fs_events(&cx).await;
3777 buffer3.update(&mut cx, |buffer, cx| {
3778 buffer.edit(Some(0..0), "x", cx);
3779 });
3780 events.borrow_mut().clear();
3781 fs::remove_file(dir.path().join("file3")).unwrap();
3782 buffer3
3783 .condition(&cx, |_, _| !events.borrow().is_empty())
3784 .await;
3785 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3786 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3787 }
3788
3789 #[gpui::test]
3790 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3791 use std::fs;
3792
3793 let initial_contents = "aaa\nbbbbb\nc\n";
3794 let dir = temp_tree(json!({ "the-file": initial_contents }));
3795 let client = Client::new();
3796 let http_client = FakeHttpClient::with_404_response();
3797 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3798
3799 let tree = Worktree::open_local(
3800 client,
3801 user_store,
3802 dir.path(),
3803 Arc::new(RealFs),
3804 Default::default(),
3805 &mut cx.to_async(),
3806 )
3807 .await
3808 .unwrap();
3809 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3810 .await;
3811
3812 let abs_path = dir.path().join("the-file");
3813 let buffer = tree
3814 .update(&mut cx, |tree, cx| {
3815 tree.open_buffer(Path::new("the-file"), cx)
3816 })
3817 .await
3818 .unwrap();
3819
3820 // TODO
3821 // Add a cursor on each row.
3822 // let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3823 // assert!(!buffer.is_dirty());
3824 // buffer.add_selection_set(
3825 // &(0..3)
3826 // .map(|row| Selection {
3827 // id: row as usize,
3828 // start: Point::new(row, 1),
3829 // end: Point::new(row, 1),
3830 // reversed: false,
3831 // goal: SelectionGoal::None,
3832 // })
3833 // .collect::<Vec<_>>(),
3834 // cx,
3835 // )
3836 // });
3837
3838 // Change the file on disk, adding two new lines of text, and removing
3839 // one line.
3840 buffer.read_with(&cx, |buffer, _| {
3841 assert!(!buffer.is_dirty());
3842 assert!(!buffer.has_conflict());
3843 });
3844 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3845 fs::write(&abs_path, new_contents).unwrap();
3846
3847 // Because the buffer was not modified, it is reloaded from disk. Its
3848 // contents are edited according to the diff between the old and new
3849 // file contents.
3850 buffer
3851 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3852 .await;
3853
3854 buffer.update(&mut cx, |buffer, _| {
3855 assert_eq!(buffer.text(), new_contents);
3856 assert!(!buffer.is_dirty());
3857 assert!(!buffer.has_conflict());
3858
3859 // TODO
3860 // let cursor_positions = buffer
3861 // .selection_set(selection_set_id)
3862 // .unwrap()
3863 // .selections::<Point>(&*buffer)
3864 // .map(|selection| {
3865 // assert_eq!(selection.start, selection.end);
3866 // selection.start
3867 // })
3868 // .collect::<Vec<_>>();
3869 // assert_eq!(
3870 // cursor_positions,
3871 // [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3872 // );
3873 });
3874
3875 // Modify the buffer
3876 buffer.update(&mut cx, |buffer, cx| {
3877 buffer.edit(vec![0..0], " ", cx);
3878 assert!(buffer.is_dirty());
3879 assert!(!buffer.has_conflict());
3880 });
3881
3882 // Change the file on disk again, adding blank lines to the beginning.
3883 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3884
3885 // Because the buffer is modified, it doesn't reload from disk, but is
3886 // marked as having a conflict.
3887 buffer
3888 .condition(&cx, |buffer, _| buffer.has_conflict())
3889 .await;
3890 }
3891
3892 #[gpui::test]
3893 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3894 let (language_server_config, mut fake_server) =
3895 LanguageServerConfig::fake(cx.background()).await;
3896 let mut languages = LanguageRegistry::new();
3897 languages.add(Arc::new(Language::new(
3898 LanguageConfig {
3899 name: "Rust".to_string(),
3900 path_suffixes: vec!["rs".to_string()],
3901 language_server: Some(language_server_config),
3902 ..Default::default()
3903 },
3904 Some(tree_sitter_rust::language()),
3905 )));
3906
3907 let dir = temp_tree(json!({
3908 "a.rs": "fn a() { A }",
3909 "b.rs": "const y: i32 = 1",
3910 }));
3911
3912 let client = Client::new();
3913 let http_client = FakeHttpClient::with_404_response();
3914 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3915
3916 let tree = Worktree::open_local(
3917 client,
3918 user_store,
3919 dir.path(),
3920 Arc::new(RealFs),
3921 Arc::new(languages),
3922 &mut cx.to_async(),
3923 )
3924 .await
3925 .unwrap();
3926 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3927 .await;
3928
3929 // Cause worktree to start the fake language server
3930 let _buffer = tree
3931 .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
3932 .await
3933 .unwrap();
3934
3935 fake_server
3936 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3937 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3938 version: None,
3939 diagnostics: vec![lsp::Diagnostic {
3940 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3941 severity: Some(lsp::DiagnosticSeverity::ERROR),
3942 message: "undefined variable 'A'".to_string(),
3943 ..Default::default()
3944 }],
3945 })
3946 .await;
3947
3948 let buffer = tree
3949 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3950 .await
3951 .unwrap();
3952
3953 buffer.read_with(&cx, |buffer, _| {
3954 let diagnostics = buffer
3955 .snapshot()
3956 .diagnostics_in_range::<_, Point>(0..buffer.len())
3957 .collect::<Vec<_>>();
3958 assert_eq!(
3959 diagnostics,
3960 &[DiagnosticEntry {
3961 range: Point::new(0, 9)..Point::new(0, 10),
3962 diagnostic: Diagnostic {
3963 severity: lsp::DiagnosticSeverity::ERROR,
3964 message: "undefined variable 'A'".to_string(),
3965 group_id: 0,
3966 is_primary: true,
3967 ..Default::default()
3968 }
3969 }]
3970 )
3971 });
3972 }
3973
3974 #[gpui::test]
3975 async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) {
3976 let fs = Arc::new(FakeFs::new());
3977 let client = Client::new();
3978 let http_client = FakeHttpClient::with_404_response();
3979 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3980
3981 fs.insert_tree(
3982 "/the-dir",
3983 json!({
3984 "a.rs": "
3985 fn foo(mut v: Vec<usize>) {
3986 for x in &v {
3987 v.push(1);
3988 }
3989 }
3990 "
3991 .unindent(),
3992 }),
3993 )
3994 .await;
3995
3996 let worktree = Worktree::open_local(
3997 client.clone(),
3998 user_store,
3999 "/the-dir".as_ref(),
4000 fs,
4001 Default::default(),
4002 &mut cx.to_async(),
4003 )
4004 .await
4005 .unwrap();
4006
4007 let buffer = worktree
4008 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
4009 .await
4010 .unwrap();
4011
4012 let buffer_uri = Url::from_file_path("/the-dir/a.rs").unwrap();
4013 let message = lsp::PublishDiagnosticsParams {
4014 uri: buffer_uri.clone(),
4015 diagnostics: vec![
4016 lsp::Diagnostic {
4017 range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
4018 severity: Some(DiagnosticSeverity::WARNING),
4019 message: "error 1".to_string(),
4020 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4021 location: lsp::Location {
4022 uri: buffer_uri.clone(),
4023 range: lsp::Range::new(
4024 lsp::Position::new(1, 8),
4025 lsp::Position::new(1, 9),
4026 ),
4027 },
4028 message: "error 1 hint 1".to_string(),
4029 }]),
4030 ..Default::default()
4031 },
4032 lsp::Diagnostic {
4033 range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
4034 severity: Some(DiagnosticSeverity::HINT),
4035 message: "error 1 hint 1".to_string(),
4036 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4037 location: lsp::Location {
4038 uri: buffer_uri.clone(),
4039 range: lsp::Range::new(
4040 lsp::Position::new(1, 8),
4041 lsp::Position::new(1, 9),
4042 ),
4043 },
4044 message: "original diagnostic".to_string(),
4045 }]),
4046 ..Default::default()
4047 },
4048 lsp::Diagnostic {
4049 range: lsp::Range::new(lsp::Position::new(2, 8), lsp::Position::new(2, 17)),
4050 severity: Some(DiagnosticSeverity::ERROR),
4051 message: "error 2".to_string(),
4052 related_information: Some(vec![
4053 lsp::DiagnosticRelatedInformation {
4054 location: lsp::Location {
4055 uri: buffer_uri.clone(),
4056 range: lsp::Range::new(
4057 lsp::Position::new(1, 13),
4058 lsp::Position::new(1, 15),
4059 ),
4060 },
4061 message: "error 2 hint 1".to_string(),
4062 },
4063 lsp::DiagnosticRelatedInformation {
4064 location: lsp::Location {
4065 uri: buffer_uri.clone(),
4066 range: lsp::Range::new(
4067 lsp::Position::new(1, 13),
4068 lsp::Position::new(1, 15),
4069 ),
4070 },
4071 message: "error 2 hint 2".to_string(),
4072 },
4073 ]),
4074 ..Default::default()
4075 },
4076 lsp::Diagnostic {
4077 range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
4078 severity: Some(DiagnosticSeverity::HINT),
4079 message: "error 2 hint 1".to_string(),
4080 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4081 location: lsp::Location {
4082 uri: buffer_uri.clone(),
4083 range: lsp::Range::new(
4084 lsp::Position::new(2, 8),
4085 lsp::Position::new(2, 17),
4086 ),
4087 },
4088 message: "original diagnostic".to_string(),
4089 }]),
4090 ..Default::default()
4091 },
4092 lsp::Diagnostic {
4093 range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
4094 severity: Some(DiagnosticSeverity::HINT),
4095 message: "error 2 hint 2".to_string(),
4096 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4097 location: lsp::Location {
4098 uri: buffer_uri.clone(),
4099 range: lsp::Range::new(
4100 lsp::Position::new(2, 8),
4101 lsp::Position::new(2, 17),
4102 ),
4103 },
4104 message: "original diagnostic".to_string(),
4105 }]),
4106 ..Default::default()
4107 },
4108 ],
4109 version: None,
4110 };
4111
4112 worktree
4113 .update(&mut cx, |tree, cx| tree.update_diagnostics(message, cx))
4114 .unwrap();
4115 let buffer = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4116
4117 assert_eq!(
4118 buffer
4119 .diagnostics_in_range::<_, Point>(0..buffer.len())
4120 .collect::<Vec<_>>(),
4121 &[
4122 DiagnosticEntry {
4123 range: Point::new(1, 8)..Point::new(1, 9),
4124 diagnostic: Diagnostic {
4125 severity: DiagnosticSeverity::WARNING,
4126 message: "error 1".to_string(),
4127 group_id: 0,
4128 is_primary: true,
4129 ..Default::default()
4130 }
4131 },
4132 DiagnosticEntry {
4133 range: Point::new(1, 8)..Point::new(1, 9),
4134 diagnostic: Diagnostic {
4135 severity: DiagnosticSeverity::HINT,
4136 message: "error 1 hint 1".to_string(),
4137 group_id: 0,
4138 is_primary: false,
4139 ..Default::default()
4140 }
4141 },
4142 DiagnosticEntry {
4143 range: Point::new(1, 13)..Point::new(1, 15),
4144 diagnostic: Diagnostic {
4145 severity: DiagnosticSeverity::HINT,
4146 message: "error 2 hint 1".to_string(),
4147 group_id: 1,
4148 is_primary: false,
4149 ..Default::default()
4150 }
4151 },
4152 DiagnosticEntry {
4153 range: Point::new(1, 13)..Point::new(1, 15),
4154 diagnostic: Diagnostic {
4155 severity: DiagnosticSeverity::HINT,
4156 message: "error 2 hint 2".to_string(),
4157 group_id: 1,
4158 is_primary: false,
4159 ..Default::default()
4160 }
4161 },
4162 DiagnosticEntry {
4163 range: Point::new(2, 8)..Point::new(2, 17),
4164 diagnostic: Diagnostic {
4165 severity: DiagnosticSeverity::ERROR,
4166 message: "error 2".to_string(),
4167 group_id: 1,
4168 is_primary: true,
4169 ..Default::default()
4170 }
4171 }
4172 ]
4173 );
4174
4175 assert_eq!(
4176 buffer.diagnostic_group::<Point>(0).collect::<Vec<_>>(),
4177 &[
4178 DiagnosticEntry {
4179 range: Point::new(1, 8)..Point::new(1, 9),
4180 diagnostic: Diagnostic {
4181 severity: DiagnosticSeverity::WARNING,
4182 message: "error 1".to_string(),
4183 group_id: 0,
4184 is_primary: true,
4185 ..Default::default()
4186 }
4187 },
4188 DiagnosticEntry {
4189 range: Point::new(1, 8)..Point::new(1, 9),
4190 diagnostic: Diagnostic {
4191 severity: DiagnosticSeverity::HINT,
4192 message: "error 1 hint 1".to_string(),
4193 group_id: 0,
4194 is_primary: false,
4195 ..Default::default()
4196 }
4197 },
4198 ]
4199 );
4200 assert_eq!(
4201 buffer.diagnostic_group::<Point>(1).collect::<Vec<_>>(),
4202 &[
4203 DiagnosticEntry {
4204 range: Point::new(1, 13)..Point::new(1, 15),
4205 diagnostic: Diagnostic {
4206 severity: DiagnosticSeverity::HINT,
4207 message: "error 2 hint 1".to_string(),
4208 group_id: 1,
4209 is_primary: false,
4210 ..Default::default()
4211 }
4212 },
4213 DiagnosticEntry {
4214 range: Point::new(1, 13)..Point::new(1, 15),
4215 diagnostic: Diagnostic {
4216 severity: DiagnosticSeverity::HINT,
4217 message: "error 2 hint 2".to_string(),
4218 group_id: 1,
4219 is_primary: false,
4220 ..Default::default()
4221 }
4222 },
4223 DiagnosticEntry {
4224 range: Point::new(2, 8)..Point::new(2, 17),
4225 diagnostic: Diagnostic {
4226 severity: DiagnosticSeverity::ERROR,
4227 message: "error 2".to_string(),
4228 group_id: 1,
4229 is_primary: true,
4230 ..Default::default()
4231 }
4232 }
4233 ]
4234 );
4235 }
4236
4237 #[gpui::test(iterations = 100)]
4238 fn test_random(mut rng: StdRng) {
4239 let operations = env::var("OPERATIONS")
4240 .map(|o| o.parse().unwrap())
4241 .unwrap_or(40);
4242 let initial_entries = env::var("INITIAL_ENTRIES")
4243 .map(|o| o.parse().unwrap())
4244 .unwrap_or(20);
4245
4246 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
4247 for _ in 0..initial_entries {
4248 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
4249 }
4250 log::info!("Generated initial tree");
4251
4252 let (notify_tx, _notify_rx) = smol::channel::unbounded();
4253 let fs = Arc::new(RealFs);
4254 let next_entry_id = Arc::new(AtomicUsize::new(0));
4255 let mut initial_snapshot = Snapshot {
4256 id: 0,
4257 scan_id: 0,
4258 abs_path: root_dir.path().into(),
4259 entries_by_path: Default::default(),
4260 entries_by_id: Default::default(),
4261 removed_entry_ids: Default::default(),
4262 ignores: Default::default(),
4263 root_name: Default::default(),
4264 root_char_bag: Default::default(),
4265 next_entry_id: next_entry_id.clone(),
4266 };
4267 initial_snapshot.insert_entry(
4268 Entry::new(
4269 Path::new("").into(),
4270 &smol::block_on(fs.metadata(root_dir.path()))
4271 .unwrap()
4272 .unwrap(),
4273 &next_entry_id,
4274 Default::default(),
4275 ),
4276 fs.as_ref(),
4277 );
4278 let mut scanner = BackgroundScanner::new(
4279 Arc::new(Mutex::new(initial_snapshot.clone())),
4280 notify_tx,
4281 fs.clone(),
4282 Arc::new(gpui::executor::Background::new()),
4283 );
4284 smol::block_on(scanner.scan_dirs()).unwrap();
4285 scanner.snapshot().check_invariants();
4286
4287 let mut events = Vec::new();
4288 let mut snapshots = Vec::new();
4289 let mut mutations_len = operations;
4290 while mutations_len > 1 {
4291 if !events.is_empty() && rng.gen_bool(0.4) {
4292 let len = rng.gen_range(0..=events.len());
4293 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
4294 log::info!("Delivering events: {:#?}", to_deliver);
4295 smol::block_on(scanner.process_events(to_deliver));
4296 scanner.snapshot().check_invariants();
4297 } else {
4298 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
4299 mutations_len -= 1;
4300 }
4301
4302 if rng.gen_bool(0.2) {
4303 snapshots.push(scanner.snapshot());
4304 }
4305 }
4306 log::info!("Quiescing: {:#?}", events);
4307 smol::block_on(scanner.process_events(events));
4308 scanner.snapshot().check_invariants();
4309
4310 let (notify_tx, _notify_rx) = smol::channel::unbounded();
4311 let mut new_scanner = BackgroundScanner::new(
4312 Arc::new(Mutex::new(initial_snapshot)),
4313 notify_tx,
4314 scanner.fs.clone(),
4315 scanner.executor.clone(),
4316 );
4317 smol::block_on(new_scanner.scan_dirs()).unwrap();
4318 assert_eq!(
4319 scanner.snapshot().to_vec(true),
4320 new_scanner.snapshot().to_vec(true)
4321 );
4322
4323 for mut prev_snapshot in snapshots {
4324 let include_ignored = rng.gen::<bool>();
4325 if !include_ignored {
4326 let mut entries_by_path_edits = Vec::new();
4327 let mut entries_by_id_edits = Vec::new();
4328 for entry in prev_snapshot
4329 .entries_by_id
4330 .cursor::<()>()
4331 .filter(|e| e.is_ignored)
4332 {
4333 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
4334 entries_by_id_edits.push(Edit::Remove(entry.id));
4335 }
4336
4337 prev_snapshot
4338 .entries_by_path
4339 .edit(entries_by_path_edits, &());
4340 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
4341 }
4342
4343 let update = scanner
4344 .snapshot()
4345 .build_update(&prev_snapshot, 0, include_ignored);
4346 prev_snapshot.apply_update(update).unwrap();
4347 assert_eq!(
4348 prev_snapshot.to_vec(true),
4349 scanner.snapshot().to_vec(include_ignored)
4350 );
4351 }
4352 }
4353
4354 fn randomly_mutate_tree(
4355 root_path: &Path,
4356 insertion_probability: f64,
4357 rng: &mut impl Rng,
4358 ) -> Result<Vec<fsevent::Event>> {
4359 let root_path = root_path.canonicalize().unwrap();
4360 let (dirs, files) = read_dir_recursive(root_path.clone());
4361
4362 let mut events = Vec::new();
4363 let mut record_event = |path: PathBuf| {
4364 events.push(fsevent::Event {
4365 event_id: SystemTime::now()
4366 .duration_since(UNIX_EPOCH)
4367 .unwrap()
4368 .as_secs(),
4369 flags: fsevent::StreamFlags::empty(),
4370 path,
4371 });
4372 };
4373
4374 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
4375 let path = dirs.choose(rng).unwrap();
4376 let new_path = path.join(gen_name(rng));
4377
4378 if rng.gen() {
4379 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
4380 std::fs::create_dir(&new_path)?;
4381 } else {
4382 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
4383 std::fs::write(&new_path, "")?;
4384 }
4385 record_event(new_path);
4386 } else if rng.gen_bool(0.05) {
4387 let ignore_dir_path = dirs.choose(rng).unwrap();
4388 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
4389
4390 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
4391 let files_to_ignore = {
4392 let len = rng.gen_range(0..=subfiles.len());
4393 subfiles.choose_multiple(rng, len)
4394 };
4395 let dirs_to_ignore = {
4396 let len = rng.gen_range(0..subdirs.len());
4397 subdirs.choose_multiple(rng, len)
4398 };
4399
4400 let mut ignore_contents = String::new();
4401 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
4402 write!(
4403 ignore_contents,
4404 "{}\n",
4405 path_to_ignore
4406 .strip_prefix(&ignore_dir_path)?
4407 .to_str()
4408 .unwrap()
4409 )
4410 .unwrap();
4411 }
4412 log::info!(
4413 "Creating {:?} with contents:\n{}",
4414 ignore_path.strip_prefix(&root_path)?,
4415 ignore_contents
4416 );
4417 std::fs::write(&ignore_path, ignore_contents).unwrap();
4418 record_event(ignore_path);
4419 } else {
4420 let old_path = {
4421 let file_path = files.choose(rng);
4422 let dir_path = dirs[1..].choose(rng);
4423 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
4424 };
4425
4426 let is_rename = rng.gen();
4427 if is_rename {
4428 let new_path_parent = dirs
4429 .iter()
4430 .filter(|d| !d.starts_with(old_path))
4431 .choose(rng)
4432 .unwrap();
4433
4434 let overwrite_existing_dir =
4435 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
4436 let new_path = if overwrite_existing_dir {
4437 std::fs::remove_dir_all(&new_path_parent).ok();
4438 new_path_parent.to_path_buf()
4439 } else {
4440 new_path_parent.join(gen_name(rng))
4441 };
4442
4443 log::info!(
4444 "Renaming {:?} to {}{:?}",
4445 old_path.strip_prefix(&root_path)?,
4446 if overwrite_existing_dir {
4447 "overwrite "
4448 } else {
4449 ""
4450 },
4451 new_path.strip_prefix(&root_path)?
4452 );
4453 std::fs::rename(&old_path, &new_path)?;
4454 record_event(old_path.clone());
4455 record_event(new_path);
4456 } else if old_path.is_dir() {
4457 let (dirs, files) = read_dir_recursive(old_path.clone());
4458
4459 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
4460 std::fs::remove_dir_all(&old_path).unwrap();
4461 for file in files {
4462 record_event(file);
4463 }
4464 for dir in dirs {
4465 record_event(dir);
4466 }
4467 } else {
4468 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
4469 std::fs::remove_file(old_path).unwrap();
4470 record_event(old_path.clone());
4471 }
4472 }
4473
4474 Ok(events)
4475 }
4476
4477 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
4478 let child_entries = std::fs::read_dir(&path).unwrap();
4479 let mut dirs = vec![path];
4480 let mut files = Vec::new();
4481 for child_entry in child_entries {
4482 let child_path = child_entry.unwrap().path();
4483 if child_path.is_dir() {
4484 let (child_dirs, child_files) = read_dir_recursive(child_path);
4485 dirs.extend(child_dirs);
4486 files.extend(child_files);
4487 } else {
4488 files.push(child_path);
4489 }
4490 }
4491 (dirs, files)
4492 }
4493
4494 fn gen_name(rng: &mut impl Rng) -> String {
4495 (0..6)
4496 .map(|_| rng.sample(rand::distributions::Alphanumeric))
4497 .map(char::from)
4498 .collect()
4499 }
4500
4501 impl Snapshot {
4502 fn check_invariants(&self) {
4503 let mut files = self.files(true, 0);
4504 let mut visible_files = self.files(false, 0);
4505 for entry in self.entries_by_path.cursor::<()>() {
4506 if entry.is_file() {
4507 assert_eq!(files.next().unwrap().inode, entry.inode);
4508 if !entry.is_ignored {
4509 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
4510 }
4511 }
4512 }
4513 assert!(files.next().is_none());
4514 assert!(visible_files.next().is_none());
4515
4516 let mut bfs_paths = Vec::new();
4517 let mut stack = vec![Path::new("")];
4518 while let Some(path) = stack.pop() {
4519 bfs_paths.push(path);
4520 let ix = stack.len();
4521 for child_entry in self.child_entries(path) {
4522 stack.insert(ix, &child_entry.path);
4523 }
4524 }
4525
4526 let dfs_paths = self
4527 .entries_by_path
4528 .cursor::<()>()
4529 .map(|e| e.path.as_ref())
4530 .collect::<Vec<_>>();
4531 assert_eq!(bfs_paths, dfs_paths);
4532
4533 for (ignore_parent_path, _) in &self.ignores {
4534 assert!(self.entry_for_path(ignore_parent_path).is_some());
4535 assert!(self
4536 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
4537 .is_some());
4538 }
4539 }
4540
4541 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
4542 let mut paths = Vec::new();
4543 for entry in self.entries_by_path.cursor::<()>() {
4544 if include_ignored || !entry.is_ignored {
4545 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
4546 }
4547 }
4548 paths.sort_by(|a, b| a.0.cmp(&b.0));
4549 paths
4550 }
4551 }
4552}