1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4 DiagnosticSummary,
5};
6use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
7use anyhow::{anyhow, Context, Result};
8use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
9use clock::ReplicaId;
10use collections::{hash_map, HashMap};
11use futures::{Stream, StreamExt};
12use fuzzy::CharBag;
13use gpui::{
14 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
15 Task, UpgradeModelHandle, WeakModelHandle,
16};
17use language::{
18 Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Language, LanguageRegistry,
19 Operation, PointUtf16, Rope,
20};
21use lazy_static::lazy_static;
22use lsp::LanguageServer;
23use parking_lot::Mutex;
24use postage::{
25 prelude::{Sink as _, Stream as _},
26 watch,
27};
28use serde::Deserialize;
29use smol::channel::{self, Sender};
30use std::{
31 any::Any,
32 cmp::{self, Ordering},
33 convert::{TryFrom, TryInto},
34 ffi::{OsStr, OsString},
35 fmt,
36 future::Future,
37 ops::{Deref, Range},
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicUsize, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, SystemTime},
44};
45use sum_tree::Bias;
46use sum_tree::{Edit, SeekTarget, SumTree};
47use util::{post_inc, ResultExt, TryFutureExt};
48
49lazy_static! {
50 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
51}
52
53#[derive(Clone, Debug)]
54enum ScanState {
55 Idle,
56 Scanning,
57 Err(Arc<anyhow::Error>),
58}
59
60pub enum Worktree {
61 Local(LocalWorktree),
62 Remote(RemoteWorktree),
63}
64
65pub enum Event {
66 Closed,
67}
68
69#[derive(Clone, Debug)]
70pub struct Collaborator {
71 pub user: Arc<User>,
72 pub peer_id: PeerId,
73 pub replica_id: ReplicaId,
74}
75
76impl Collaborator {
77 fn from_proto(
78 message: proto::Collaborator,
79 user_store: &ModelHandle<UserStore>,
80 cx: &mut AsyncAppContext,
81 ) -> impl Future<Output = Result<Self>> {
82 let user = user_store.update(cx, |user_store, cx| {
83 user_store.fetch_user(message.user_id, cx)
84 });
85
86 async move {
87 Ok(Self {
88 peer_id: PeerId(message.peer_id),
89 user: user.await?,
90 replica_id: message.replica_id as ReplicaId,
91 })
92 }
93 }
94}
95
96impl Entity for Worktree {
97 type Event = Event;
98
99 fn release(&mut self, cx: &mut MutableAppContext) {
100 match self {
101 Self::Local(tree) => {
102 if let Some(worktree_id) = *tree.remote_id.borrow() {
103 let rpc = tree.client.clone();
104 cx.spawn(|_| async move {
105 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
106 log::error!("error closing worktree: {}", err);
107 }
108 })
109 .detach();
110 }
111 }
112 Self::Remote(tree) => {
113 let rpc = tree.client.clone();
114 let worktree_id = tree.remote_id;
115 cx.spawn(|_| async move {
116 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
117 log::error!("error closing worktree: {}", err);
118 }
119 })
120 .detach();
121 }
122 }
123 }
124
125 fn app_will_quit(
126 &mut self,
127 _: &mut MutableAppContext,
128 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
129 use futures::FutureExt;
130
131 if let Self::Local(worktree) = self {
132 let shutdown_futures = worktree
133 .language_servers
134 .drain()
135 .filter_map(|(_, server)| server.shutdown())
136 .collect::<Vec<_>>();
137 Some(
138 async move {
139 futures::future::join_all(shutdown_futures).await;
140 }
141 .boxed(),
142 )
143 } else {
144 None
145 }
146 }
147}
148
149impl Worktree {
150 pub async fn open_local(
151 client: Arc<Client>,
152 user_store: ModelHandle<UserStore>,
153 path: impl Into<Arc<Path>>,
154 fs: Arc<dyn Fs>,
155 languages: Arc<LanguageRegistry>,
156 cx: &mut AsyncAppContext,
157 ) -> Result<ModelHandle<Self>> {
158 let (tree, scan_states_tx) =
159 LocalWorktree::new(client, user_store, path, fs.clone(), languages, cx).await?;
160 tree.update(cx, |tree, cx| {
161 let tree = tree.as_local_mut().unwrap();
162 let abs_path = tree.snapshot.abs_path.clone();
163 let background_snapshot = tree.background_snapshot.clone();
164 let background = cx.background().clone();
165 tree._background_scanner_task = Some(cx.background().spawn(async move {
166 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
167 let scanner =
168 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
169 scanner.run(events).await;
170 }));
171 });
172 Ok(tree)
173 }
174
175 pub async fn open_remote(
176 client: Arc<Client>,
177 id: u64,
178 languages: Arc<LanguageRegistry>,
179 user_store: ModelHandle<UserStore>,
180 cx: &mut AsyncAppContext,
181 ) -> Result<ModelHandle<Self>> {
182 let response = client
183 .request(proto::JoinWorktree { worktree_id: id })
184 .await?;
185 Worktree::remote(response, client, user_store, languages, cx).await
186 }
187
188 async fn remote(
189 join_response: proto::JoinWorktreeResponse,
190 client: Arc<Client>,
191 user_store: ModelHandle<UserStore>,
192 languages: Arc<LanguageRegistry>,
193 cx: &mut AsyncAppContext,
194 ) -> Result<ModelHandle<Self>> {
195 let worktree = join_response
196 .worktree
197 .ok_or_else(|| anyhow!("empty worktree"))?;
198
199 let remote_id = worktree.id;
200 let replica_id = join_response.replica_id as ReplicaId;
201 let root_char_bag: CharBag = worktree
202 .root_name
203 .chars()
204 .map(|c| c.to_ascii_lowercase())
205 .collect();
206 let root_name = worktree.root_name.clone();
207 let (entries_by_path, entries_by_id) = cx
208 .background()
209 .spawn(async move {
210 let mut entries_by_path_edits = Vec::new();
211 let mut entries_by_id_edits = Vec::new();
212 for entry in worktree.entries {
213 match Entry::try_from((&root_char_bag, entry)) {
214 Ok(entry) => {
215 entries_by_id_edits.push(Edit::Insert(PathEntry {
216 id: entry.id,
217 path: entry.path.clone(),
218 is_ignored: entry.is_ignored,
219 scan_id: 0,
220 }));
221 entries_by_path_edits.push(Edit::Insert(entry));
222 }
223 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
224 }
225 }
226
227 let mut entries_by_path = SumTree::new();
228 let mut entries_by_id = SumTree::new();
229 entries_by_path.edit(entries_by_path_edits, &());
230 entries_by_id.edit(entries_by_id_edits, &());
231 (entries_by_path, entries_by_id)
232 })
233 .await;
234
235 let user_ids = join_response
236 .collaborators
237 .iter()
238 .map(|peer| peer.user_id)
239 .collect();
240 user_store
241 .update(cx, |user_store, cx| user_store.load_users(user_ids, cx))
242 .await?;
243 let mut collaborators = HashMap::default();
244 for message in join_response.collaborators {
245 let collaborator = Collaborator::from_proto(message, &user_store, cx).await?;
246 collaborators.insert(collaborator.peer_id, collaborator);
247 }
248
249 let worktree = cx.update(|cx| {
250 cx.add_model(|cx: &mut ModelContext<Worktree>| {
251 let snapshot = Snapshot {
252 id: cx.model_id(),
253 scan_id: 0,
254 abs_path: Path::new("").into(),
255 root_name,
256 root_char_bag,
257 ignores: Default::default(),
258 entries_by_path,
259 entries_by_id,
260 removed_entry_ids: Default::default(),
261 next_entry_id: Default::default(),
262 };
263
264 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
265 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
266
267 cx.background()
268 .spawn(async move {
269 while let Some(update) = updates_rx.recv().await {
270 let mut snapshot = snapshot_tx.borrow().clone();
271 if let Err(error) = snapshot.apply_update(update) {
272 log::error!("error applying worktree update: {}", error);
273 }
274 *snapshot_tx.borrow_mut() = snapshot;
275 }
276 })
277 .detach();
278
279 {
280 let mut snapshot_rx = snapshot_rx.clone();
281 cx.spawn_weak(|this, mut cx| async move {
282 while let Some(_) = snapshot_rx.recv().await {
283 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
284 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
285 } else {
286 break;
287 }
288 }
289 })
290 .detach();
291 }
292
293 let _subscriptions = vec![
294 client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator),
295 client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator),
296 client.subscribe_to_entity(remote_id, cx, Self::handle_update),
297 client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
298 client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
299 client.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
300 ];
301
302 Worktree::Remote(RemoteWorktree {
303 remote_id,
304 replica_id,
305 snapshot,
306 snapshot_rx,
307 updates_tx,
308 client: client.clone(),
309 loading_buffers: Default::default(),
310 open_buffers: Default::default(),
311 diagnostic_summaries: HashMap::default(),
312 collaborators,
313 queued_operations: Default::default(),
314 languages,
315 user_store,
316 _subscriptions,
317 })
318 })
319 });
320
321 Ok(worktree)
322 }
323
324 pub fn as_local(&self) -> Option<&LocalWorktree> {
325 if let Worktree::Local(worktree) = self {
326 Some(worktree)
327 } else {
328 None
329 }
330 }
331
332 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
333 if let Worktree::Local(worktree) = self {
334 Some(worktree)
335 } else {
336 None
337 }
338 }
339
340 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
341 if let Worktree::Remote(worktree) = self {
342 Some(worktree)
343 } else {
344 None
345 }
346 }
347
348 pub fn snapshot(&self) -> Snapshot {
349 match self {
350 Worktree::Local(worktree) => worktree.snapshot(),
351 Worktree::Remote(worktree) => worktree.snapshot(),
352 }
353 }
354
355 pub fn replica_id(&self) -> ReplicaId {
356 match self {
357 Worktree::Local(_) => 0,
358 Worktree::Remote(worktree) => worktree.replica_id,
359 }
360 }
361
362 pub fn languages(&self) -> &Arc<LanguageRegistry> {
363 match self {
364 Worktree::Local(worktree) => &worktree.languages,
365 Worktree::Remote(worktree) => &worktree.languages,
366 }
367 }
368
369 pub fn user_store(&self) -> &ModelHandle<UserStore> {
370 match self {
371 Worktree::Local(worktree) => &worktree.user_store,
372 Worktree::Remote(worktree) => &worktree.user_store,
373 }
374 }
375
376 pub fn handle_add_collaborator(
377 &mut self,
378 mut envelope: TypedEnvelope<proto::AddCollaborator>,
379 _: Arc<Client>,
380 cx: &mut ModelContext<Self>,
381 ) -> Result<()> {
382 let user_store = self.user_store().clone();
383 let collaborator = envelope
384 .payload
385 .collaborator
386 .take()
387 .ok_or_else(|| anyhow!("empty collaborator"))?;
388
389 cx.spawn(|this, mut cx| {
390 async move {
391 let collaborator =
392 Collaborator::from_proto(collaborator, &user_store, &mut cx).await?;
393 this.update(&mut cx, |this, cx| match this {
394 Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx),
395 Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx),
396 });
397 Ok(())
398 }
399 .log_err()
400 })
401 .detach();
402
403 Ok(())
404 }
405
406 pub fn handle_remove_collaborator(
407 &mut self,
408 envelope: TypedEnvelope<proto::RemoveCollaborator>,
409 _: Arc<Client>,
410 cx: &mut ModelContext<Self>,
411 ) -> Result<()> {
412 match self {
413 Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx),
414 Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx),
415 }
416 }
417
418 pub fn handle_update(
419 &mut self,
420 envelope: TypedEnvelope<proto::UpdateWorktree>,
421 _: Arc<Client>,
422 cx: &mut ModelContext<Self>,
423 ) -> anyhow::Result<()> {
424 self.as_remote_mut()
425 .unwrap()
426 .update_from_remote(envelope, cx)
427 }
428
429 pub fn handle_open_buffer(
430 &mut self,
431 envelope: TypedEnvelope<proto::OpenBuffer>,
432 rpc: Arc<Client>,
433 cx: &mut ModelContext<Self>,
434 ) -> anyhow::Result<()> {
435 let receipt = envelope.receipt();
436
437 let response = self
438 .as_local_mut()
439 .unwrap()
440 .open_remote_buffer(envelope, cx);
441
442 cx.background()
443 .spawn(
444 async move {
445 rpc.respond(receipt, response.await?).await?;
446 Ok(())
447 }
448 .log_err(),
449 )
450 .detach();
451
452 Ok(())
453 }
454
455 pub fn handle_close_buffer(
456 &mut self,
457 envelope: TypedEnvelope<proto::CloseBuffer>,
458 _: Arc<Client>,
459 cx: &mut ModelContext<Self>,
460 ) -> anyhow::Result<()> {
461 self.as_local_mut()
462 .unwrap()
463 .close_remote_buffer(envelope, cx)
464 }
465
466 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
467 match self {
468 Worktree::Local(worktree) => &worktree.collaborators,
469 Worktree::Remote(worktree) => &worktree.collaborators,
470 }
471 }
472
473 pub fn diagnostic_summaries<'a>(
474 &'a self,
475 cx: &'a AppContext,
476 ) -> impl Iterator<Item = DiagnosticSummary> {
477 std::iter::empty()
478 }
479
480 pub fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers {
481 match self {
482 Worktree::Local(worktree) => &mut worktree.loading_buffers,
483 Worktree::Remote(worktree) => &mut worktree.loading_buffers,
484 }
485 }
486
487 pub fn open_buffer(
488 &mut self,
489 path: impl AsRef<Path>,
490 cx: &mut ModelContext<Self>,
491 ) -> Task<Result<ModelHandle<Buffer>>> {
492 let path = path.as_ref();
493
494 // If there is already a buffer for the given path, then return it.
495 let existing_buffer = match self {
496 Worktree::Local(worktree) => worktree.get_open_buffer(path, cx),
497 Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx),
498 };
499 if let Some(existing_buffer) = existing_buffer {
500 return cx.spawn(move |_, _| async move { Ok(existing_buffer) });
501 }
502
503 let path: Arc<Path> = Arc::from(path);
504 let mut loading_watch = match self.loading_buffers().entry(path.clone()) {
505 // If the given path is already being loaded, then wait for that existing
506 // task to complete and return the same buffer.
507 hash_map::Entry::Occupied(e) => e.get().clone(),
508
509 // Otherwise, record the fact that this path is now being loaded.
510 hash_map::Entry::Vacant(entry) => {
511 let (mut tx, rx) = postage::watch::channel();
512 entry.insert(rx.clone());
513
514 let load_buffer = match self {
515 Worktree::Local(worktree) => worktree.open_buffer(&path, cx),
516 Worktree::Remote(worktree) => worktree.open_buffer(&path, cx),
517 };
518 cx.spawn(move |this, mut cx| async move {
519 let result = load_buffer.await;
520
521 // After the buffer loads, record the fact that it is no longer
522 // loading.
523 this.update(&mut cx, |this, _| this.loading_buffers().remove(&path));
524 *tx.borrow_mut() = Some(result.map_err(|e| Arc::new(e)));
525 })
526 .detach();
527 rx
528 }
529 };
530
531 cx.spawn(|_, _| async move {
532 loop {
533 if let Some(result) = loading_watch.borrow().as_ref() {
534 return result.clone().map_err(|e| anyhow!("{}", e));
535 }
536 loading_watch.recv().await;
537 }
538 })
539 }
540
541 #[cfg(feature = "test-support")]
542 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
543 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
544 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
545 Worktree::Remote(worktree) => {
546 Box::new(worktree.open_buffers.values().filter_map(|buf| {
547 if let RemoteBuffer::Loaded(buf) = buf {
548 Some(buf)
549 } else {
550 None
551 }
552 }))
553 }
554 };
555
556 let path = path.as_ref();
557 open_buffers
558 .find(|buffer| {
559 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
560 file.path().as_ref() == path
561 } else {
562 false
563 }
564 })
565 .is_some()
566 }
567
568 pub fn handle_update_buffer(
569 &mut self,
570 envelope: TypedEnvelope<proto::UpdateBuffer>,
571 _: Arc<Client>,
572 cx: &mut ModelContext<Self>,
573 ) -> Result<()> {
574 let payload = envelope.payload.clone();
575 let buffer_id = payload.buffer_id as usize;
576 let ops = payload
577 .operations
578 .into_iter()
579 .map(|op| language::proto::deserialize_operation(op))
580 .collect::<Result<Vec<_>, _>>()?;
581
582 match self {
583 Worktree::Local(worktree) => {
584 let buffer = worktree
585 .open_buffers
586 .get(&buffer_id)
587 .and_then(|buf| buf.upgrade(cx))
588 .ok_or_else(|| {
589 anyhow!("invalid buffer {} in update buffer message", buffer_id)
590 })?;
591 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
592 }
593 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
594 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
595 Some(RemoteBuffer::Loaded(buffer)) => {
596 if let Some(buffer) = buffer.upgrade(cx) {
597 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
598 } else {
599 worktree
600 .open_buffers
601 .insert(buffer_id, RemoteBuffer::Operations(ops));
602 }
603 }
604 None => {
605 worktree
606 .open_buffers
607 .insert(buffer_id, RemoteBuffer::Operations(ops));
608 }
609 },
610 }
611
612 Ok(())
613 }
614
615 pub fn handle_save_buffer(
616 &mut self,
617 envelope: TypedEnvelope<proto::SaveBuffer>,
618 rpc: Arc<Client>,
619 cx: &mut ModelContext<Self>,
620 ) -> Result<()> {
621 let sender_id = envelope.original_sender_id()?;
622 let buffer = self
623 .as_local()
624 .unwrap()
625 .shared_buffers
626 .get(&sender_id)
627 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
628 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
629
630 let receipt = envelope.receipt();
631 let worktree_id = envelope.payload.worktree_id;
632 let buffer_id = envelope.payload.buffer_id;
633 let save = cx.spawn(|_, mut cx| async move {
634 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
635 });
636
637 cx.background()
638 .spawn(
639 async move {
640 let (version, mtime) = save.await?;
641
642 rpc.respond(
643 receipt,
644 proto::BufferSaved {
645 worktree_id,
646 buffer_id,
647 version: (&version).into(),
648 mtime: Some(mtime.into()),
649 },
650 )
651 .await?;
652
653 Ok(())
654 }
655 .log_err(),
656 )
657 .detach();
658
659 Ok(())
660 }
661
662 pub fn handle_buffer_saved(
663 &mut self,
664 envelope: TypedEnvelope<proto::BufferSaved>,
665 _: Arc<Client>,
666 cx: &mut ModelContext<Self>,
667 ) -> Result<()> {
668 let payload = envelope.payload.clone();
669 let worktree = self.as_remote_mut().unwrap();
670 if let Some(buffer) = worktree
671 .open_buffers
672 .get(&(payload.buffer_id as usize))
673 .and_then(|buf| buf.upgrade(cx))
674 {
675 buffer.update(cx, |buffer, cx| {
676 let version = payload.version.try_into()?;
677 let mtime = payload
678 .mtime
679 .ok_or_else(|| anyhow!("missing mtime"))?
680 .into();
681 buffer.did_save(version, mtime, None, cx);
682 Result::<_, anyhow::Error>::Ok(())
683 })?;
684 }
685 Ok(())
686 }
687
688 pub fn handle_unshare(
689 &mut self,
690 _: TypedEnvelope<proto::UnshareWorktree>,
691 _: Arc<Client>,
692 cx: &mut ModelContext<Self>,
693 ) -> Result<()> {
694 cx.emit(Event::Closed);
695 Ok(())
696 }
697
698 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
699 match self {
700 Self::Local(worktree) => {
701 let is_fake_fs = worktree.fs.is_fake();
702 worktree.snapshot = worktree.background_snapshot.lock().clone();
703 if worktree.is_scanning() {
704 if worktree.poll_task.is_none() {
705 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
706 if is_fake_fs {
707 smol::future::yield_now().await;
708 } else {
709 smol::Timer::after(Duration::from_millis(100)).await;
710 }
711 this.update(&mut cx, |this, cx| {
712 this.as_local_mut().unwrap().poll_task = None;
713 this.poll_snapshot(cx);
714 })
715 }));
716 }
717 } else {
718 worktree.poll_task.take();
719 self.update_open_buffers(cx);
720 }
721 }
722 Self::Remote(worktree) => {
723 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
724 self.update_open_buffers(cx);
725 }
726 };
727
728 cx.notify();
729 }
730
731 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
732 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
733 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
734 Self::Remote(worktree) => {
735 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
736 if let RemoteBuffer::Loaded(buf) = buf {
737 Some((id, buf))
738 } else {
739 None
740 }
741 }))
742 }
743 };
744
745 let local = self.as_local().is_some();
746 let worktree_path = self.abs_path.clone();
747 let worktree_handle = cx.handle();
748 let mut buffers_to_delete = Vec::new();
749 for (buffer_id, buffer) in open_buffers {
750 if let Some(buffer) = buffer.upgrade(cx) {
751 buffer.update(cx, |buffer, cx| {
752 if let Some(old_file) = buffer.file() {
753 let new_file = if let Some(entry) = old_file
754 .entry_id()
755 .and_then(|entry_id| self.entry_for_id(entry_id))
756 {
757 File {
758 is_local: local,
759 worktree_path: worktree_path.clone(),
760 entry_id: Some(entry.id),
761 mtime: entry.mtime,
762 path: entry.path.clone(),
763 worktree: worktree_handle.clone(),
764 }
765 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
766 File {
767 is_local: local,
768 worktree_path: worktree_path.clone(),
769 entry_id: Some(entry.id),
770 mtime: entry.mtime,
771 path: entry.path.clone(),
772 worktree: worktree_handle.clone(),
773 }
774 } else {
775 File {
776 is_local: local,
777 worktree_path: worktree_path.clone(),
778 entry_id: None,
779 path: old_file.path().clone(),
780 mtime: old_file.mtime(),
781 worktree: worktree_handle.clone(),
782 }
783 };
784
785 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
786 task.detach();
787 }
788 }
789 });
790 } else {
791 buffers_to_delete.push(*buffer_id);
792 }
793 }
794
795 for buffer_id in buffers_to_delete {
796 match self {
797 Self::Local(worktree) => {
798 worktree.open_buffers.remove(&buffer_id);
799 }
800 Self::Remote(worktree) => {
801 worktree.open_buffers.remove(&buffer_id);
802 }
803 }
804 }
805 }
806
807 fn update_diagnostics(
808 &mut self,
809 params: lsp::PublishDiagnosticsParams,
810 cx: &mut ModelContext<Worktree>,
811 ) -> Result<()> {
812 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
813 let abs_path = params
814 .uri
815 .to_file_path()
816 .map_err(|_| anyhow!("URI is not a file"))?;
817 let worktree_path = Arc::from(
818 abs_path
819 .strip_prefix(&this.abs_path)
820 .context("path is not within worktree")?,
821 );
822
823 let mut group_ids_by_diagnostic_range = HashMap::default();
824 let mut diagnostics_by_group_id = HashMap::default();
825 let mut next_group_id = 0;
826 for diagnostic in ¶ms.diagnostics {
827 let source = diagnostic.source.as_ref();
828 let code = diagnostic.code.as_ref();
829 let group_id = diagnostic_ranges(&diagnostic, &abs_path)
830 .find_map(|range| group_ids_by_diagnostic_range.get(&(source, code, range)))
831 .copied()
832 .unwrap_or_else(|| {
833 let group_id = post_inc(&mut next_group_id);
834 for range in diagnostic_ranges(&diagnostic, &abs_path) {
835 group_ids_by_diagnostic_range.insert((source, code, range), group_id);
836 }
837 group_id
838 });
839
840 diagnostics_by_group_id
841 .entry(group_id)
842 .or_insert(Vec::new())
843 .push(DiagnosticEntry {
844 range: diagnostic.range.start.to_point_utf16()
845 ..diagnostic.range.end.to_point_utf16(),
846 diagnostic: Diagnostic {
847 source: diagnostic.source.clone(),
848 code: diagnostic.code.clone().map(|code| match code {
849 lsp::NumberOrString::Number(code) => code.to_string(),
850 lsp::NumberOrString::String(code) => code,
851 }),
852 severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
853 message: diagnostic.message.clone(),
854 group_id,
855 is_primary: false,
856 },
857 });
858 }
859
860 let diagnostics = diagnostics_by_group_id
861 .into_values()
862 .flat_map(|mut diagnostics| {
863 let primary = diagnostics
864 .iter_mut()
865 .min_by_key(|entry| entry.diagnostic.severity)
866 .unwrap();
867 primary.diagnostic.is_primary = true;
868 diagnostics
869 })
870 .collect::<Vec<_>>();
871
872 for buffer in this.open_buffers.values() {
873 if let Some(buffer) = buffer.upgrade(cx) {
874 if buffer
875 .read(cx)
876 .file()
877 .map_or(false, |file| *file.path() == worktree_path)
878 {
879 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
880 (
881 buffer.remote_id(),
882 buffer.update_diagnostics(params.version, diagnostics, cx),
883 )
884 });
885 self.send_buffer_update(remote_id, operation?, cx);
886 return Ok(());
887 }
888 }
889 }
890
891 this.diagnostics.insert(worktree_path, diagnostics);
892 Ok(())
893 }
894
895 fn send_buffer_update(
896 &mut self,
897 buffer_id: u64,
898 operation: Operation,
899 cx: &mut ModelContext<Self>,
900 ) {
901 if let Some((rpc, remote_id)) = match self {
902 Worktree::Local(worktree) => worktree
903 .remote_id
904 .borrow()
905 .map(|id| (worktree.client.clone(), id)),
906 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
907 } {
908 cx.spawn(|worktree, mut cx| async move {
909 if let Err(error) = rpc
910 .request(proto::UpdateBuffer {
911 worktree_id: remote_id,
912 buffer_id,
913 operations: vec![language::proto::serialize_operation(&operation)],
914 })
915 .await
916 {
917 worktree.update(&mut cx, |worktree, _| {
918 log::error!("error sending buffer operation: {}", error);
919 match worktree {
920 Worktree::Local(t) => &mut t.queued_operations,
921 Worktree::Remote(t) => &mut t.queued_operations,
922 }
923 .push((buffer_id, operation));
924 });
925 }
926 })
927 .detach();
928 }
929 }
930}
931
932#[derive(Clone)]
933pub struct Snapshot {
934 id: usize,
935 scan_id: usize,
936 abs_path: Arc<Path>,
937 root_name: String,
938 root_char_bag: CharBag,
939 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
940 entries_by_path: SumTree<Entry>,
941 entries_by_id: SumTree<PathEntry>,
942 removed_entry_ids: HashMap<u64, usize>,
943 next_entry_id: Arc<AtomicUsize>,
944}
945
946pub struct LocalWorktree {
947 snapshot: Snapshot,
948 config: WorktreeConfig,
949 background_snapshot: Arc<Mutex<Snapshot>>,
950 last_scan_state_rx: watch::Receiver<ScanState>,
951 _background_scanner_task: Option<Task<()>>,
952 _maintain_remote_id_task: Task<Option<()>>,
953 poll_task: Option<Task<()>>,
954 remote_id: watch::Receiver<Option<u64>>,
955 share: Option<ShareState>,
956 loading_buffers: LoadingBuffers,
957 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
958 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
959 diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
960 diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
961 collaborators: HashMap<PeerId, Collaborator>,
962 queued_operations: Vec<(u64, Operation)>,
963 languages: Arc<LanguageRegistry>,
964 client: Arc<Client>,
965 user_store: ModelHandle<UserStore>,
966 fs: Arc<dyn Fs>,
967 language_servers: HashMap<String, Arc<LanguageServer>>,
968}
969
970struct ShareState {
971 snapshots_tx: Sender<Snapshot>,
972 _subscriptions: Vec<client::Subscription>,
973}
974
975pub struct RemoteWorktree {
976 remote_id: u64,
977 snapshot: Snapshot,
978 snapshot_rx: watch::Receiver<Snapshot>,
979 client: Arc<Client>,
980 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
981 replica_id: ReplicaId,
982 loading_buffers: LoadingBuffers,
983 open_buffers: HashMap<usize, RemoteBuffer>,
984 collaborators: HashMap<PeerId, Collaborator>,
985 diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
986 languages: Arc<LanguageRegistry>,
987 user_store: ModelHandle<UserStore>,
988 queued_operations: Vec<(u64, Operation)>,
989 _subscriptions: Vec<client::Subscription>,
990}
991
992type LoadingBuffers = HashMap<
993 Arc<Path>,
994 postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
995>;
996
997#[derive(Default, Deserialize)]
998struct WorktreeConfig {
999 collaborators: Vec<String>,
1000}
1001
1002impl LocalWorktree {
1003 async fn new(
1004 client: Arc<Client>,
1005 user_store: ModelHandle<UserStore>,
1006 path: impl Into<Arc<Path>>,
1007 fs: Arc<dyn Fs>,
1008 languages: Arc<LanguageRegistry>,
1009 cx: &mut AsyncAppContext,
1010 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
1011 let abs_path = path.into();
1012 let path: Arc<Path> = Arc::from(Path::new(""));
1013 let next_entry_id = AtomicUsize::new(0);
1014
1015 // After determining whether the root entry is a file or a directory, populate the
1016 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
1017 let root_name = abs_path
1018 .file_name()
1019 .map_or(String::new(), |f| f.to_string_lossy().to_string());
1020 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
1021 let metadata = fs.metadata(&abs_path).await?;
1022
1023 let mut config = WorktreeConfig::default();
1024 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
1025 if let Ok(parsed) = toml::from_str(&zed_toml) {
1026 config = parsed;
1027 }
1028 }
1029
1030 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
1031 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
1032 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
1033 let mut snapshot = Snapshot {
1034 id: cx.model_id(),
1035 scan_id: 0,
1036 abs_path,
1037 root_name: root_name.clone(),
1038 root_char_bag,
1039 ignores: Default::default(),
1040 entries_by_path: Default::default(),
1041 entries_by_id: Default::default(),
1042 removed_entry_ids: Default::default(),
1043 next_entry_id: Arc::new(next_entry_id),
1044 };
1045 if let Some(metadata) = metadata {
1046 snapshot.insert_entry(
1047 Entry::new(
1048 path.into(),
1049 &metadata,
1050 &snapshot.next_entry_id,
1051 snapshot.root_char_bag,
1052 ),
1053 fs.as_ref(),
1054 );
1055 }
1056
1057 let (mut remote_id_tx, remote_id_rx) = watch::channel();
1058 let _maintain_remote_id_task = cx.spawn_weak({
1059 let rpc = client.clone();
1060 move |this, cx| {
1061 async move {
1062 let mut status = rpc.status();
1063 while let Some(status) = status.recv().await {
1064 if let Some(this) = this.upgrade(&cx) {
1065 let remote_id = if let client::Status::Connected { .. } = status {
1066 let authorized_logins = this.read_with(&cx, |this, _| {
1067 this.as_local().unwrap().config.collaborators.clone()
1068 });
1069 let response = rpc
1070 .request(proto::OpenWorktree {
1071 root_name: root_name.clone(),
1072 authorized_logins,
1073 })
1074 .await?;
1075
1076 Some(response.worktree_id)
1077 } else {
1078 None
1079 };
1080 if remote_id_tx.send(remote_id).await.is_err() {
1081 break;
1082 }
1083 }
1084 }
1085 Ok(())
1086 }
1087 .log_err()
1088 }
1089 });
1090
1091 let tree = Self {
1092 snapshot: snapshot.clone(),
1093 config,
1094 remote_id: remote_id_rx,
1095 background_snapshot: Arc::new(Mutex::new(snapshot)),
1096 last_scan_state_rx,
1097 _background_scanner_task: None,
1098 _maintain_remote_id_task,
1099 share: None,
1100 poll_task: None,
1101 loading_buffers: Default::default(),
1102 open_buffers: Default::default(),
1103 shared_buffers: Default::default(),
1104 diagnostics: Default::default(),
1105 diagnostic_summaries: Default::default(),
1106 queued_operations: Default::default(),
1107 collaborators: Default::default(),
1108 languages,
1109 client,
1110 user_store,
1111 fs,
1112 language_servers: Default::default(),
1113 };
1114
1115 cx.spawn_weak(|this, mut cx| async move {
1116 while let Ok(scan_state) = scan_states_rx.recv().await {
1117 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1118 let to_send = handle.update(&mut cx, |this, cx| {
1119 last_scan_state_tx.blocking_send(scan_state).ok();
1120 this.poll_snapshot(cx);
1121 let tree = this.as_local_mut().unwrap();
1122 if !tree.is_scanning() {
1123 if let Some(share) = tree.share.as_ref() {
1124 return Some((tree.snapshot(), share.snapshots_tx.clone()));
1125 }
1126 }
1127 None
1128 });
1129
1130 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
1131 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
1132 log::error!("error submitting snapshot to send {}", err);
1133 }
1134 }
1135 } else {
1136 break;
1137 }
1138 }
1139 })
1140 .detach();
1141
1142 Worktree::Local(tree)
1143 });
1144
1145 Ok((tree, scan_states_tx))
1146 }
1147
1148 pub fn languages(&self) -> &LanguageRegistry {
1149 &self.languages
1150 }
1151
1152 pub fn ensure_language_server(
1153 &mut self,
1154 language: &Language,
1155 cx: &mut ModelContext<Worktree>,
1156 ) -> Option<Arc<LanguageServer>> {
1157 if let Some(server) = self.language_servers.get(language.name()) {
1158 return Some(server.clone());
1159 }
1160
1161 if let Some(language_server) = language
1162 .start_server(self.abs_path(), cx)
1163 .log_err()
1164 .flatten()
1165 {
1166 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
1167 language_server
1168 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
1169 smol::block_on(diagnostics_tx.send(params)).ok();
1170 })
1171 .detach();
1172 cx.spawn_weak(|this, mut cx| async move {
1173 while let Ok(diagnostics) = diagnostics_rx.recv().await {
1174 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1175 handle.update(&mut cx, |this, cx| {
1176 this.update_diagnostics(diagnostics, cx).log_err();
1177 });
1178 } else {
1179 break;
1180 }
1181 }
1182 })
1183 .detach();
1184
1185 self.language_servers
1186 .insert(language.name().to_string(), language_server.clone());
1187 Some(language_server.clone())
1188 } else {
1189 None
1190 }
1191 }
1192
1193 fn get_open_buffer(
1194 &mut self,
1195 path: &Path,
1196 cx: &mut ModelContext<Worktree>,
1197 ) -> Option<ModelHandle<Buffer>> {
1198 let handle = cx.handle();
1199 let mut result = None;
1200 self.open_buffers.retain(|_buffer_id, buffer| {
1201 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1202 if let Some(file) = buffer.read(cx.as_ref()).file() {
1203 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1204 result = Some(buffer);
1205 }
1206 }
1207 true
1208 } else {
1209 false
1210 }
1211 });
1212 result
1213 }
1214
1215 fn open_buffer(
1216 &mut self,
1217 path: &Path,
1218 cx: &mut ModelContext<Worktree>,
1219 ) -> Task<Result<ModelHandle<Buffer>>> {
1220 let path = Arc::from(path);
1221 cx.spawn(move |this, mut cx| async move {
1222 let (file, contents) = this
1223 .update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))
1224 .await?;
1225
1226 let (diagnostics, language, language_server) = this.update(&mut cx, |this, cx| {
1227 let this = this.as_local_mut().unwrap();
1228 let diagnostics = this.diagnostics.remove(&path);
1229 let language = this.languages.select_language(file.full_path()).cloned();
1230 let server = language
1231 .as_ref()
1232 .and_then(|language| this.ensure_language_server(language, cx));
1233 (diagnostics, language, server)
1234 });
1235
1236 let buffer = cx.add_model(|cx| {
1237 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
1238 buffer.set_language(language, language_server, cx);
1239 if let Some(diagnostics) = diagnostics {
1240 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1241 }
1242 buffer
1243 });
1244
1245 this.update(&mut cx, |this, _| {
1246 let this = this.as_local_mut().unwrap();
1247 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1248 });
1249
1250 Ok(buffer)
1251 })
1252 }
1253
1254 pub fn open_remote_buffer(
1255 &mut self,
1256 envelope: TypedEnvelope<proto::OpenBuffer>,
1257 cx: &mut ModelContext<Worktree>,
1258 ) -> Task<Result<proto::OpenBufferResponse>> {
1259 cx.spawn(|this, mut cx| async move {
1260 let peer_id = envelope.original_sender_id();
1261 let path = Path::new(&envelope.payload.path);
1262 let buffer = this
1263 .update(&mut cx, |this, cx| this.open_buffer(path, cx))
1264 .await?;
1265 this.update(&mut cx, |this, cx| {
1266 this.as_local_mut()
1267 .unwrap()
1268 .shared_buffers
1269 .entry(peer_id?)
1270 .or_default()
1271 .insert(buffer.id() as u64, buffer.clone());
1272
1273 Ok(proto::OpenBufferResponse {
1274 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1275 })
1276 })
1277 })
1278 }
1279
1280 pub fn close_remote_buffer(
1281 &mut self,
1282 envelope: TypedEnvelope<proto::CloseBuffer>,
1283 cx: &mut ModelContext<Worktree>,
1284 ) -> Result<()> {
1285 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1286 shared_buffers.remove(&envelope.payload.buffer_id);
1287 cx.notify();
1288 }
1289
1290 Ok(())
1291 }
1292
1293 pub fn add_collaborator(
1294 &mut self,
1295 collaborator: Collaborator,
1296 cx: &mut ModelContext<Worktree>,
1297 ) {
1298 self.collaborators
1299 .insert(collaborator.peer_id, collaborator);
1300 cx.notify();
1301 }
1302
1303 pub fn remove_collaborator(
1304 &mut self,
1305 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1306 cx: &mut ModelContext<Worktree>,
1307 ) -> Result<()> {
1308 let peer_id = PeerId(envelope.payload.peer_id);
1309 let replica_id = self
1310 .collaborators
1311 .remove(&peer_id)
1312 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1313 .replica_id;
1314 self.shared_buffers.remove(&peer_id);
1315 for (_, buffer) in &self.open_buffers {
1316 if let Some(buffer) = buffer.upgrade(cx) {
1317 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1318 }
1319 }
1320 cx.notify();
1321
1322 Ok(())
1323 }
1324
1325 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1326 let mut scan_state_rx = self.last_scan_state_rx.clone();
1327 async move {
1328 let mut scan_state = Some(scan_state_rx.borrow().clone());
1329 while let Some(ScanState::Scanning) = scan_state {
1330 scan_state = scan_state_rx.recv().await;
1331 }
1332 }
1333 }
1334
1335 pub fn remote_id(&self) -> Option<u64> {
1336 *self.remote_id.borrow()
1337 }
1338
1339 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1340 let mut remote_id = self.remote_id.clone();
1341 async move {
1342 while let Some(remote_id) = remote_id.recv().await {
1343 if remote_id.is_some() {
1344 return remote_id;
1345 }
1346 }
1347 None
1348 }
1349 }
1350
1351 fn is_scanning(&self) -> bool {
1352 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1353 true
1354 } else {
1355 false
1356 }
1357 }
1358
1359 pub fn snapshot(&self) -> Snapshot {
1360 self.snapshot.clone()
1361 }
1362
1363 pub fn abs_path(&self) -> &Path {
1364 self.snapshot.abs_path.as_ref()
1365 }
1366
1367 pub fn contains_abs_path(&self, path: &Path) -> bool {
1368 path.starts_with(&self.snapshot.abs_path)
1369 }
1370
1371 fn absolutize(&self, path: &Path) -> PathBuf {
1372 if path.file_name().is_some() {
1373 self.snapshot.abs_path.join(path)
1374 } else {
1375 self.snapshot.abs_path.to_path_buf()
1376 }
1377 }
1378
1379 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1380 let handle = cx.handle();
1381 let path = Arc::from(path);
1382 let worktree_path = self.abs_path.clone();
1383 let abs_path = self.absolutize(&path);
1384 let background_snapshot = self.background_snapshot.clone();
1385 let fs = self.fs.clone();
1386 cx.spawn(|this, mut cx| async move {
1387 let text = fs.load(&abs_path).await?;
1388 // Eagerly populate the snapshot with an updated entry for the loaded file
1389 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1390 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1391 Ok((
1392 File {
1393 entry_id: Some(entry.id),
1394 worktree: handle,
1395 worktree_path,
1396 path: entry.path,
1397 mtime: entry.mtime,
1398 is_local: true,
1399 },
1400 text,
1401 ))
1402 })
1403 }
1404
1405 pub fn save_buffer_as(
1406 &self,
1407 buffer: ModelHandle<Buffer>,
1408 path: impl Into<Arc<Path>>,
1409 text: Rope,
1410 cx: &mut ModelContext<Worktree>,
1411 ) -> Task<Result<File>> {
1412 let save = self.save(path, text, cx);
1413 cx.spawn(|this, mut cx| async move {
1414 let entry = save.await?;
1415 this.update(&mut cx, |this, cx| {
1416 let this = this.as_local_mut().unwrap();
1417 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1418 Ok(File {
1419 entry_id: Some(entry.id),
1420 worktree: cx.handle(),
1421 worktree_path: this.abs_path.clone(),
1422 path: entry.path,
1423 mtime: entry.mtime,
1424 is_local: true,
1425 })
1426 })
1427 })
1428 }
1429
1430 fn save(
1431 &self,
1432 path: impl Into<Arc<Path>>,
1433 text: Rope,
1434 cx: &mut ModelContext<Worktree>,
1435 ) -> Task<Result<Entry>> {
1436 let path = path.into();
1437 let abs_path = self.absolutize(&path);
1438 let background_snapshot = self.background_snapshot.clone();
1439 let fs = self.fs.clone();
1440 let save = cx.background().spawn(async move {
1441 fs.save(&abs_path, &text).await?;
1442 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1443 });
1444
1445 cx.spawn(|this, mut cx| async move {
1446 let entry = save.await?;
1447 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1448 Ok(entry)
1449 })
1450 }
1451
1452 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1453 let snapshot = self.snapshot();
1454 let share_request = self.share_request(cx);
1455 let rpc = self.client.clone();
1456 cx.spawn(|this, mut cx| async move {
1457 let share_request = if let Some(request) = share_request.await {
1458 request
1459 } else {
1460 return Err(anyhow!("failed to open worktree on the server"));
1461 };
1462
1463 let remote_id = share_request.worktree.as_ref().unwrap().id;
1464 let share_response = rpc.request(share_request).await?;
1465
1466 log::info!("sharing worktree {:?}", share_response);
1467 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1468 smol::channel::unbounded::<Snapshot>();
1469
1470 cx.background()
1471 .spawn({
1472 let rpc = rpc.clone();
1473 async move {
1474 let mut prev_snapshot = snapshot;
1475 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1476 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1477 match rpc.send(message).await {
1478 Ok(()) => prev_snapshot = snapshot,
1479 Err(err) => log::error!("error sending snapshot diff {}", err),
1480 }
1481 }
1482 }
1483 })
1484 .detach();
1485
1486 this.update(&mut cx, |worktree, cx| {
1487 let _subscriptions = vec![
1488 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator),
1489 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator),
1490 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1491 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1492 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1493 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1494 ];
1495
1496 let worktree = worktree.as_local_mut().unwrap();
1497 worktree.share = Some(ShareState {
1498 snapshots_tx: snapshots_to_send_tx,
1499 _subscriptions,
1500 });
1501 });
1502
1503 Ok(remote_id)
1504 })
1505 }
1506
1507 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1508 self.share.take();
1509 let rpc = self.client.clone();
1510 let remote_id = self.remote_id();
1511 cx.foreground()
1512 .spawn(
1513 async move {
1514 if let Some(worktree_id) = remote_id {
1515 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1516 }
1517 Ok(())
1518 }
1519 .log_err(),
1520 )
1521 .detach()
1522 }
1523
1524 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1525 let remote_id = self.next_remote_id();
1526 let snapshot = self.snapshot();
1527 let root_name = self.root_name.clone();
1528 cx.background().spawn(async move {
1529 remote_id.await.map(|id| {
1530 let entries = snapshot
1531 .entries_by_path
1532 .cursor::<()>()
1533 .filter(|e| !e.is_ignored)
1534 .map(Into::into)
1535 .collect();
1536 proto::ShareWorktree {
1537 worktree: Some(proto::Worktree {
1538 id,
1539 root_name,
1540 entries,
1541 }),
1542 }
1543 })
1544 })
1545 }
1546}
1547
1548fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1549 let contents = smol::block_on(fs.load(&abs_path))?;
1550 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1551 let mut builder = GitignoreBuilder::new(parent);
1552 for line in contents.lines() {
1553 builder.add_line(Some(abs_path.into()), line)?;
1554 }
1555 Ok(builder.build()?)
1556}
1557
1558impl Deref for Worktree {
1559 type Target = Snapshot;
1560
1561 fn deref(&self) -> &Self::Target {
1562 match self {
1563 Worktree::Local(worktree) => &worktree.snapshot,
1564 Worktree::Remote(worktree) => &worktree.snapshot,
1565 }
1566 }
1567}
1568
1569impl Deref for LocalWorktree {
1570 type Target = Snapshot;
1571
1572 fn deref(&self) -> &Self::Target {
1573 &self.snapshot
1574 }
1575}
1576
1577impl fmt::Debug for LocalWorktree {
1578 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1579 self.snapshot.fmt(f)
1580 }
1581}
1582
1583impl RemoteWorktree {
1584 fn get_open_buffer(
1585 &mut self,
1586 path: &Path,
1587 cx: &mut ModelContext<Worktree>,
1588 ) -> Option<ModelHandle<Buffer>> {
1589 let handle = cx.handle();
1590 let mut existing_buffer = None;
1591 self.open_buffers.retain(|_buffer_id, buffer| {
1592 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1593 if let Some(file) = buffer.read(cx.as_ref()).file() {
1594 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1595 existing_buffer = Some(buffer);
1596 }
1597 }
1598 true
1599 } else {
1600 false
1601 }
1602 });
1603 existing_buffer
1604 }
1605
1606 fn open_buffer(
1607 &mut self,
1608 path: &Path,
1609 cx: &mut ModelContext<Worktree>,
1610 ) -> Task<Result<ModelHandle<Buffer>>> {
1611 let rpc = self.client.clone();
1612 let replica_id = self.replica_id;
1613 let remote_worktree_id = self.remote_id;
1614 let root_path = self.snapshot.abs_path.clone();
1615 let path: Arc<Path> = Arc::from(path);
1616 let path_string = path.to_string_lossy().to_string();
1617 cx.spawn_weak(move |this, mut cx| async move {
1618 let entry = this
1619 .upgrade(&cx)
1620 .ok_or_else(|| anyhow!("worktree was closed"))?
1621 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1622 .ok_or_else(|| anyhow!("file does not exist"))?;
1623 let response = rpc
1624 .request(proto::OpenBuffer {
1625 worktree_id: remote_worktree_id as u64,
1626 path: path_string,
1627 })
1628 .await?;
1629
1630 let this = this
1631 .upgrade(&cx)
1632 .ok_or_else(|| anyhow!("worktree was closed"))?;
1633 let file = File {
1634 entry_id: Some(entry.id),
1635 worktree: this.clone(),
1636 worktree_path: root_path,
1637 path: entry.path,
1638 mtime: entry.mtime,
1639 is_local: false,
1640 };
1641 let language = this.read_with(&cx, |this, _| {
1642 use language::File;
1643 this.languages().select_language(file.full_path()).cloned()
1644 });
1645 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1646 let buffer_id = remote_buffer.id as usize;
1647 let buffer = cx.add_model(|cx| {
1648 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1649 .unwrap()
1650 .with_language(language, None, cx)
1651 });
1652 this.update(&mut cx, move |this, cx| {
1653 let this = this.as_remote_mut().unwrap();
1654 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1655 .open_buffers
1656 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1657 {
1658 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1659 }
1660 Result::<_, anyhow::Error>::Ok(buffer)
1661 })
1662 })
1663 }
1664
1665 pub fn remote_id(&self) -> u64 {
1666 self.remote_id
1667 }
1668
1669 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1670 for (_, buffer) in self.open_buffers.drain() {
1671 if let RemoteBuffer::Loaded(buffer) = buffer {
1672 if let Some(buffer) = buffer.upgrade(cx) {
1673 buffer.update(cx, |buffer, cx| buffer.close(cx))
1674 }
1675 }
1676 }
1677 }
1678
1679 fn snapshot(&self) -> Snapshot {
1680 self.snapshot.clone()
1681 }
1682
1683 fn update_from_remote(
1684 &mut self,
1685 envelope: TypedEnvelope<proto::UpdateWorktree>,
1686 cx: &mut ModelContext<Worktree>,
1687 ) -> Result<()> {
1688 let mut tx = self.updates_tx.clone();
1689 let payload = envelope.payload.clone();
1690 cx.background()
1691 .spawn(async move {
1692 tx.send(payload).await.expect("receiver runs to completion");
1693 })
1694 .detach();
1695
1696 Ok(())
1697 }
1698
1699 pub fn add_collaborator(
1700 &mut self,
1701 collaborator: Collaborator,
1702 cx: &mut ModelContext<Worktree>,
1703 ) {
1704 self.collaborators
1705 .insert(collaborator.peer_id, collaborator);
1706 cx.notify();
1707 }
1708
1709 pub fn remove_collaborator(
1710 &mut self,
1711 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1712 cx: &mut ModelContext<Worktree>,
1713 ) -> Result<()> {
1714 let peer_id = PeerId(envelope.payload.peer_id);
1715 let replica_id = self
1716 .collaborators
1717 .remove(&peer_id)
1718 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1719 .replica_id;
1720 for (_, buffer) in &self.open_buffers {
1721 if let Some(buffer) = buffer.upgrade(cx) {
1722 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1723 }
1724 }
1725 cx.notify();
1726 Ok(())
1727 }
1728}
1729
1730enum RemoteBuffer {
1731 Operations(Vec<Operation>),
1732 Loaded(WeakModelHandle<Buffer>),
1733}
1734
1735impl RemoteBuffer {
1736 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1737 match self {
1738 Self::Operations(_) => None,
1739 Self::Loaded(buffer) => buffer.upgrade(cx),
1740 }
1741 }
1742}
1743
1744impl Snapshot {
1745 pub fn id(&self) -> usize {
1746 self.id
1747 }
1748
1749 pub fn build_update(
1750 &self,
1751 other: &Self,
1752 worktree_id: u64,
1753 include_ignored: bool,
1754 ) -> proto::UpdateWorktree {
1755 let mut updated_entries = Vec::new();
1756 let mut removed_entries = Vec::new();
1757 let mut self_entries = self
1758 .entries_by_id
1759 .cursor::<()>()
1760 .filter(|e| include_ignored || !e.is_ignored)
1761 .peekable();
1762 let mut other_entries = other
1763 .entries_by_id
1764 .cursor::<()>()
1765 .filter(|e| include_ignored || !e.is_ignored)
1766 .peekable();
1767 loop {
1768 match (self_entries.peek(), other_entries.peek()) {
1769 (Some(self_entry), Some(other_entry)) => {
1770 match Ord::cmp(&self_entry.id, &other_entry.id) {
1771 Ordering::Less => {
1772 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1773 updated_entries.push(entry);
1774 self_entries.next();
1775 }
1776 Ordering::Equal => {
1777 if self_entry.scan_id != other_entry.scan_id {
1778 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1779 updated_entries.push(entry);
1780 }
1781
1782 self_entries.next();
1783 other_entries.next();
1784 }
1785 Ordering::Greater => {
1786 removed_entries.push(other_entry.id as u64);
1787 other_entries.next();
1788 }
1789 }
1790 }
1791 (Some(self_entry), None) => {
1792 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1793 updated_entries.push(entry);
1794 self_entries.next();
1795 }
1796 (None, Some(other_entry)) => {
1797 removed_entries.push(other_entry.id as u64);
1798 other_entries.next();
1799 }
1800 (None, None) => break,
1801 }
1802 }
1803
1804 proto::UpdateWorktree {
1805 updated_entries,
1806 removed_entries,
1807 worktree_id,
1808 }
1809 }
1810
1811 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1812 self.scan_id += 1;
1813 let scan_id = self.scan_id;
1814
1815 let mut entries_by_path_edits = Vec::new();
1816 let mut entries_by_id_edits = Vec::new();
1817 for entry_id in update.removed_entries {
1818 let entry_id = entry_id as usize;
1819 let entry = self
1820 .entry_for_id(entry_id)
1821 .ok_or_else(|| anyhow!("unknown entry"))?;
1822 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1823 entries_by_id_edits.push(Edit::Remove(entry.id));
1824 }
1825
1826 for entry in update.updated_entries {
1827 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1828 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1829 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1830 }
1831 entries_by_id_edits.push(Edit::Insert(PathEntry {
1832 id: entry.id,
1833 path: entry.path.clone(),
1834 is_ignored: entry.is_ignored,
1835 scan_id,
1836 }));
1837 entries_by_path_edits.push(Edit::Insert(entry));
1838 }
1839
1840 self.entries_by_path.edit(entries_by_path_edits, &());
1841 self.entries_by_id.edit(entries_by_id_edits, &());
1842
1843 Ok(())
1844 }
1845
1846 pub fn file_count(&self) -> usize {
1847 self.entries_by_path.summary().file_count
1848 }
1849
1850 pub fn visible_file_count(&self) -> usize {
1851 self.entries_by_path.summary().visible_file_count
1852 }
1853
1854 fn traverse_from_offset(
1855 &self,
1856 include_dirs: bool,
1857 include_ignored: bool,
1858 start_offset: usize,
1859 ) -> Traversal {
1860 let mut cursor = self.entries_by_path.cursor();
1861 cursor.seek(
1862 &TraversalTarget::Count {
1863 count: start_offset,
1864 include_dirs,
1865 include_ignored,
1866 },
1867 Bias::Right,
1868 &(),
1869 );
1870 Traversal {
1871 cursor,
1872 include_dirs,
1873 include_ignored,
1874 }
1875 }
1876
1877 fn traverse_from_path(
1878 &self,
1879 include_dirs: bool,
1880 include_ignored: bool,
1881 path: &Path,
1882 ) -> Traversal {
1883 let mut cursor = self.entries_by_path.cursor();
1884 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1885 Traversal {
1886 cursor,
1887 include_dirs,
1888 include_ignored,
1889 }
1890 }
1891
1892 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1893 self.traverse_from_offset(false, include_ignored, start)
1894 }
1895
1896 pub fn entries(&self, include_ignored: bool) -> Traversal {
1897 self.traverse_from_offset(true, include_ignored, 0)
1898 }
1899
1900 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1901 let empty_path = Path::new("");
1902 self.entries_by_path
1903 .cursor::<()>()
1904 .filter(move |entry| entry.path.as_ref() != empty_path)
1905 .map(|entry| &entry.path)
1906 }
1907
1908 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1909 let mut cursor = self.entries_by_path.cursor();
1910 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1911 let traversal = Traversal {
1912 cursor,
1913 include_dirs: true,
1914 include_ignored: true,
1915 };
1916 ChildEntriesIter {
1917 traversal,
1918 parent_path,
1919 }
1920 }
1921
1922 pub fn root_entry(&self) -> Option<&Entry> {
1923 self.entry_for_path("")
1924 }
1925
1926 pub fn root_name(&self) -> &str {
1927 &self.root_name
1928 }
1929
1930 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1931 let path = path.as_ref();
1932 self.traverse_from_path(true, true, path)
1933 .entry()
1934 .and_then(|entry| {
1935 if entry.path.as_ref() == path {
1936 Some(entry)
1937 } else {
1938 None
1939 }
1940 })
1941 }
1942
1943 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1944 let entry = self.entries_by_id.get(&id, &())?;
1945 self.entry_for_path(&entry.path)
1946 }
1947
1948 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1949 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1950 }
1951
1952 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1953 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1954 let abs_path = self.abs_path.join(&entry.path);
1955 match build_gitignore(&abs_path, fs) {
1956 Ok(ignore) => {
1957 let ignore_dir_path = entry.path.parent().unwrap();
1958 self.ignores
1959 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1960 }
1961 Err(error) => {
1962 log::error!(
1963 "error loading .gitignore file {:?} - {:?}",
1964 &entry.path,
1965 error
1966 );
1967 }
1968 }
1969 }
1970
1971 self.reuse_entry_id(&mut entry);
1972 self.entries_by_path.insert_or_replace(entry.clone(), &());
1973 self.entries_by_id.insert_or_replace(
1974 PathEntry {
1975 id: entry.id,
1976 path: entry.path.clone(),
1977 is_ignored: entry.is_ignored,
1978 scan_id: self.scan_id,
1979 },
1980 &(),
1981 );
1982 entry
1983 }
1984
1985 fn populate_dir(
1986 &mut self,
1987 parent_path: Arc<Path>,
1988 entries: impl IntoIterator<Item = Entry>,
1989 ignore: Option<Arc<Gitignore>>,
1990 ) {
1991 let mut parent_entry = self
1992 .entries_by_path
1993 .get(&PathKey(parent_path.clone()), &())
1994 .unwrap()
1995 .clone();
1996 if let Some(ignore) = ignore {
1997 self.ignores.insert(parent_path, (ignore, self.scan_id));
1998 }
1999 if matches!(parent_entry.kind, EntryKind::PendingDir) {
2000 parent_entry.kind = EntryKind::Dir;
2001 } else {
2002 unreachable!();
2003 }
2004
2005 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
2006 let mut entries_by_id_edits = Vec::new();
2007
2008 for mut entry in entries {
2009 self.reuse_entry_id(&mut entry);
2010 entries_by_id_edits.push(Edit::Insert(PathEntry {
2011 id: entry.id,
2012 path: entry.path.clone(),
2013 is_ignored: entry.is_ignored,
2014 scan_id: self.scan_id,
2015 }));
2016 entries_by_path_edits.push(Edit::Insert(entry));
2017 }
2018
2019 self.entries_by_path.edit(entries_by_path_edits, &());
2020 self.entries_by_id.edit(entries_by_id_edits, &());
2021 }
2022
2023 fn reuse_entry_id(&mut self, entry: &mut Entry) {
2024 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
2025 entry.id = removed_entry_id;
2026 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
2027 entry.id = existing_entry.id;
2028 }
2029 }
2030
2031 fn remove_path(&mut self, path: &Path) {
2032 let mut new_entries;
2033 let removed_entries;
2034 {
2035 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
2036 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
2037 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
2038 new_entries.push_tree(cursor.suffix(&()), &());
2039 }
2040 self.entries_by_path = new_entries;
2041
2042 let mut entries_by_id_edits = Vec::new();
2043 for entry in removed_entries.cursor::<()>() {
2044 let removed_entry_id = self
2045 .removed_entry_ids
2046 .entry(entry.inode)
2047 .or_insert(entry.id);
2048 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
2049 entries_by_id_edits.push(Edit::Remove(entry.id));
2050 }
2051 self.entries_by_id.edit(entries_by_id_edits, &());
2052
2053 if path.file_name() == Some(&GITIGNORE) {
2054 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
2055 *scan_id = self.scan_id;
2056 }
2057 }
2058 }
2059
2060 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
2061 let mut new_ignores = Vec::new();
2062 for ancestor in path.ancestors().skip(1) {
2063 if let Some((ignore, _)) = self.ignores.get(ancestor) {
2064 new_ignores.push((ancestor, Some(ignore.clone())));
2065 } else {
2066 new_ignores.push((ancestor, None));
2067 }
2068 }
2069
2070 let mut ignore_stack = IgnoreStack::none();
2071 for (parent_path, ignore) in new_ignores.into_iter().rev() {
2072 if ignore_stack.is_path_ignored(&parent_path, true) {
2073 ignore_stack = IgnoreStack::all();
2074 break;
2075 } else if let Some(ignore) = ignore {
2076 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
2077 }
2078 }
2079
2080 if ignore_stack.is_path_ignored(path, is_dir) {
2081 ignore_stack = IgnoreStack::all();
2082 }
2083
2084 ignore_stack
2085 }
2086}
2087
2088impl fmt::Debug for Snapshot {
2089 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2090 for entry in self.entries_by_path.cursor::<()>() {
2091 for _ in entry.path.ancestors().skip(1) {
2092 write!(f, " ")?;
2093 }
2094 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
2095 }
2096 Ok(())
2097 }
2098}
2099
2100#[derive(Clone, PartialEq)]
2101pub struct File {
2102 entry_id: Option<usize>,
2103 worktree: ModelHandle<Worktree>,
2104 worktree_path: Arc<Path>,
2105 pub path: Arc<Path>,
2106 pub mtime: SystemTime,
2107 is_local: bool,
2108}
2109
2110impl language::File for File {
2111 fn worktree_id(&self) -> usize {
2112 self.worktree.id()
2113 }
2114
2115 fn entry_id(&self) -> Option<usize> {
2116 self.entry_id
2117 }
2118
2119 fn mtime(&self) -> SystemTime {
2120 self.mtime
2121 }
2122
2123 fn path(&self) -> &Arc<Path> {
2124 &self.path
2125 }
2126
2127 fn abs_path(&self) -> Option<PathBuf> {
2128 if self.is_local {
2129 Some(self.worktree_path.join(&self.path))
2130 } else {
2131 None
2132 }
2133 }
2134
2135 fn full_path(&self) -> PathBuf {
2136 let mut full_path = PathBuf::new();
2137 if let Some(worktree_name) = self.worktree_path.file_name() {
2138 full_path.push(worktree_name);
2139 }
2140 full_path.push(&self.path);
2141 full_path
2142 }
2143
2144 /// Returns the last component of this handle's absolute path. If this handle refers to the root
2145 /// of its worktree, then this method will return the name of the worktree itself.
2146 fn file_name<'a>(&'a self) -> Option<OsString> {
2147 self.path
2148 .file_name()
2149 .or_else(|| self.worktree_path.file_name())
2150 .map(Into::into)
2151 }
2152
2153 fn is_deleted(&self) -> bool {
2154 self.entry_id.is_none()
2155 }
2156
2157 fn save(
2158 &self,
2159 buffer_id: u64,
2160 text: Rope,
2161 version: clock::Global,
2162 cx: &mut MutableAppContext,
2163 ) -> Task<Result<(clock::Global, SystemTime)>> {
2164 self.worktree.update(cx, |worktree, cx| match worktree {
2165 Worktree::Local(worktree) => {
2166 let rpc = worktree.client.clone();
2167 let worktree_id = *worktree.remote_id.borrow();
2168 let save = worktree.save(self.path.clone(), text, cx);
2169 cx.background().spawn(async move {
2170 let entry = save.await?;
2171 if let Some(worktree_id) = worktree_id {
2172 rpc.send(proto::BufferSaved {
2173 worktree_id,
2174 buffer_id,
2175 version: (&version).into(),
2176 mtime: Some(entry.mtime.into()),
2177 })
2178 .await?;
2179 }
2180 Ok((version, entry.mtime))
2181 })
2182 }
2183 Worktree::Remote(worktree) => {
2184 let rpc = worktree.client.clone();
2185 let worktree_id = worktree.remote_id;
2186 cx.foreground().spawn(async move {
2187 let response = rpc
2188 .request(proto::SaveBuffer {
2189 worktree_id,
2190 buffer_id,
2191 })
2192 .await?;
2193 let version = response.version.try_into()?;
2194 let mtime = response
2195 .mtime
2196 .ok_or_else(|| anyhow!("missing mtime"))?
2197 .into();
2198 Ok((version, mtime))
2199 })
2200 }
2201 })
2202 }
2203
2204 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
2205 let worktree = self.worktree.read(cx).as_local()?;
2206 let abs_path = worktree.absolutize(&self.path);
2207 let fs = worktree.fs.clone();
2208 Some(
2209 cx.background()
2210 .spawn(async move { fs.load(&abs_path).await }),
2211 )
2212 }
2213
2214 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2215 self.worktree.update(cx, |worktree, cx| {
2216 worktree.send_buffer_update(buffer_id, operation, cx);
2217 });
2218 }
2219
2220 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2221 self.worktree.update(cx, |worktree, cx| {
2222 if let Worktree::Remote(worktree) = worktree {
2223 let worktree_id = worktree.remote_id;
2224 let rpc = worktree.client.clone();
2225 cx.background()
2226 .spawn(async move {
2227 if let Err(error) = rpc
2228 .send(proto::CloseBuffer {
2229 worktree_id,
2230 buffer_id,
2231 })
2232 .await
2233 {
2234 log::error!("error closing remote buffer: {}", error);
2235 }
2236 })
2237 .detach();
2238 }
2239 });
2240 }
2241
2242 fn boxed_clone(&self) -> Box<dyn language::File> {
2243 Box::new(self.clone())
2244 }
2245
2246 fn as_any(&self) -> &dyn Any {
2247 self
2248 }
2249}
2250
2251#[derive(Clone, Debug)]
2252pub struct Entry {
2253 pub id: usize,
2254 pub kind: EntryKind,
2255 pub path: Arc<Path>,
2256 pub inode: u64,
2257 pub mtime: SystemTime,
2258 pub is_symlink: bool,
2259 pub is_ignored: bool,
2260}
2261
2262#[derive(Clone, Debug)]
2263pub enum EntryKind {
2264 PendingDir,
2265 Dir,
2266 File(CharBag),
2267}
2268
2269impl Entry {
2270 fn new(
2271 path: Arc<Path>,
2272 metadata: &fs::Metadata,
2273 next_entry_id: &AtomicUsize,
2274 root_char_bag: CharBag,
2275 ) -> Self {
2276 Self {
2277 id: next_entry_id.fetch_add(1, SeqCst),
2278 kind: if metadata.is_dir {
2279 EntryKind::PendingDir
2280 } else {
2281 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2282 },
2283 path,
2284 inode: metadata.inode,
2285 mtime: metadata.mtime,
2286 is_symlink: metadata.is_symlink,
2287 is_ignored: false,
2288 }
2289 }
2290
2291 pub fn is_dir(&self) -> bool {
2292 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2293 }
2294
2295 pub fn is_file(&self) -> bool {
2296 matches!(self.kind, EntryKind::File(_))
2297 }
2298}
2299
2300impl sum_tree::Item for Entry {
2301 type Summary = EntrySummary;
2302
2303 fn summary(&self) -> Self::Summary {
2304 let visible_count = if self.is_ignored { 0 } else { 1 };
2305 let file_count;
2306 let visible_file_count;
2307 if self.is_file() {
2308 file_count = 1;
2309 visible_file_count = visible_count;
2310 } else {
2311 file_count = 0;
2312 visible_file_count = 0;
2313 }
2314
2315 EntrySummary {
2316 max_path: self.path.clone(),
2317 count: 1,
2318 visible_count,
2319 file_count,
2320 visible_file_count,
2321 }
2322 }
2323}
2324
2325impl sum_tree::KeyedItem for Entry {
2326 type Key = PathKey;
2327
2328 fn key(&self) -> Self::Key {
2329 PathKey(self.path.clone())
2330 }
2331}
2332
2333#[derive(Clone, Debug)]
2334pub struct EntrySummary {
2335 max_path: Arc<Path>,
2336 count: usize,
2337 visible_count: usize,
2338 file_count: usize,
2339 visible_file_count: usize,
2340}
2341
2342impl Default for EntrySummary {
2343 fn default() -> Self {
2344 Self {
2345 max_path: Arc::from(Path::new("")),
2346 count: 0,
2347 visible_count: 0,
2348 file_count: 0,
2349 visible_file_count: 0,
2350 }
2351 }
2352}
2353
2354impl sum_tree::Summary for EntrySummary {
2355 type Context = ();
2356
2357 fn add_summary(&mut self, rhs: &Self, _: &()) {
2358 self.max_path = rhs.max_path.clone();
2359 self.visible_count += rhs.visible_count;
2360 self.file_count += rhs.file_count;
2361 self.visible_file_count += rhs.visible_file_count;
2362 }
2363}
2364
2365#[derive(Clone, Debug)]
2366struct PathEntry {
2367 id: usize,
2368 path: Arc<Path>,
2369 is_ignored: bool,
2370 scan_id: usize,
2371}
2372
2373impl sum_tree::Item for PathEntry {
2374 type Summary = PathEntrySummary;
2375
2376 fn summary(&self) -> Self::Summary {
2377 PathEntrySummary { max_id: self.id }
2378 }
2379}
2380
2381impl sum_tree::KeyedItem for PathEntry {
2382 type Key = usize;
2383
2384 fn key(&self) -> Self::Key {
2385 self.id
2386 }
2387}
2388
2389#[derive(Clone, Debug, Default)]
2390struct PathEntrySummary {
2391 max_id: usize,
2392}
2393
2394impl sum_tree::Summary for PathEntrySummary {
2395 type Context = ();
2396
2397 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2398 self.max_id = summary.max_id;
2399 }
2400}
2401
2402impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2403 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2404 *self = summary.max_id;
2405 }
2406}
2407
2408#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2409pub struct PathKey(Arc<Path>);
2410
2411impl Default for PathKey {
2412 fn default() -> Self {
2413 Self(Path::new("").into())
2414 }
2415}
2416
2417impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2418 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2419 self.0 = summary.max_path.clone();
2420 }
2421}
2422
2423struct BackgroundScanner {
2424 fs: Arc<dyn Fs>,
2425 snapshot: Arc<Mutex<Snapshot>>,
2426 notify: Sender<ScanState>,
2427 executor: Arc<executor::Background>,
2428}
2429
2430impl BackgroundScanner {
2431 fn new(
2432 snapshot: Arc<Mutex<Snapshot>>,
2433 notify: Sender<ScanState>,
2434 fs: Arc<dyn Fs>,
2435 executor: Arc<executor::Background>,
2436 ) -> Self {
2437 Self {
2438 fs,
2439 snapshot,
2440 notify,
2441 executor,
2442 }
2443 }
2444
2445 fn abs_path(&self) -> Arc<Path> {
2446 self.snapshot.lock().abs_path.clone()
2447 }
2448
2449 fn snapshot(&self) -> Snapshot {
2450 self.snapshot.lock().clone()
2451 }
2452
2453 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2454 if self.notify.send(ScanState::Scanning).await.is_err() {
2455 return;
2456 }
2457
2458 if let Err(err) = self.scan_dirs().await {
2459 if self
2460 .notify
2461 .send(ScanState::Err(Arc::new(err)))
2462 .await
2463 .is_err()
2464 {
2465 return;
2466 }
2467 }
2468
2469 if self.notify.send(ScanState::Idle).await.is_err() {
2470 return;
2471 }
2472
2473 futures::pin_mut!(events_rx);
2474 while let Some(events) = events_rx.next().await {
2475 if self.notify.send(ScanState::Scanning).await.is_err() {
2476 break;
2477 }
2478
2479 if !self.process_events(events).await {
2480 break;
2481 }
2482
2483 if self.notify.send(ScanState::Idle).await.is_err() {
2484 break;
2485 }
2486 }
2487 }
2488
2489 async fn scan_dirs(&mut self) -> Result<()> {
2490 let root_char_bag;
2491 let next_entry_id;
2492 let is_dir;
2493 {
2494 let snapshot = self.snapshot.lock();
2495 root_char_bag = snapshot.root_char_bag;
2496 next_entry_id = snapshot.next_entry_id.clone();
2497 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2498 };
2499
2500 if is_dir {
2501 let path: Arc<Path> = Arc::from(Path::new(""));
2502 let abs_path = self.abs_path();
2503 let (tx, rx) = channel::unbounded();
2504 tx.send(ScanJob {
2505 abs_path: abs_path.to_path_buf(),
2506 path,
2507 ignore_stack: IgnoreStack::none(),
2508 scan_queue: tx.clone(),
2509 })
2510 .await
2511 .unwrap();
2512 drop(tx);
2513
2514 self.executor
2515 .scoped(|scope| {
2516 for _ in 0..self.executor.num_cpus() {
2517 scope.spawn(async {
2518 while let Ok(job) = rx.recv().await {
2519 if let Err(err) = self
2520 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2521 .await
2522 {
2523 log::error!("error scanning {:?}: {}", job.abs_path, err);
2524 }
2525 }
2526 });
2527 }
2528 })
2529 .await;
2530 }
2531
2532 Ok(())
2533 }
2534
2535 async fn scan_dir(
2536 &self,
2537 root_char_bag: CharBag,
2538 next_entry_id: Arc<AtomicUsize>,
2539 job: &ScanJob,
2540 ) -> Result<()> {
2541 let mut new_entries: Vec<Entry> = Vec::new();
2542 let mut new_jobs: Vec<ScanJob> = Vec::new();
2543 let mut ignore_stack = job.ignore_stack.clone();
2544 let mut new_ignore = None;
2545
2546 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2547 while let Some(child_abs_path) = child_paths.next().await {
2548 let child_abs_path = match child_abs_path {
2549 Ok(child_abs_path) => child_abs_path,
2550 Err(error) => {
2551 log::error!("error processing entry {:?}", error);
2552 continue;
2553 }
2554 };
2555 let child_name = child_abs_path.file_name().unwrap();
2556 let child_path: Arc<Path> = job.path.join(child_name).into();
2557 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2558 Some(metadata) => metadata,
2559 None => continue,
2560 };
2561
2562 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2563 if child_name == *GITIGNORE {
2564 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2565 Ok(ignore) => {
2566 let ignore = Arc::new(ignore);
2567 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2568 new_ignore = Some(ignore);
2569 }
2570 Err(error) => {
2571 log::error!(
2572 "error loading .gitignore file {:?} - {:?}",
2573 child_name,
2574 error
2575 );
2576 }
2577 }
2578
2579 // Update ignore status of any child entries we've already processed to reflect the
2580 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2581 // there should rarely be too numerous. Update the ignore stack associated with any
2582 // new jobs as well.
2583 let mut new_jobs = new_jobs.iter_mut();
2584 for entry in &mut new_entries {
2585 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2586 if entry.is_dir() {
2587 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2588 IgnoreStack::all()
2589 } else {
2590 ignore_stack.clone()
2591 };
2592 }
2593 }
2594 }
2595
2596 let mut child_entry = Entry::new(
2597 child_path.clone(),
2598 &child_metadata,
2599 &next_entry_id,
2600 root_char_bag,
2601 );
2602
2603 if child_metadata.is_dir {
2604 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2605 child_entry.is_ignored = is_ignored;
2606 new_entries.push(child_entry);
2607 new_jobs.push(ScanJob {
2608 abs_path: child_abs_path,
2609 path: child_path,
2610 ignore_stack: if is_ignored {
2611 IgnoreStack::all()
2612 } else {
2613 ignore_stack.clone()
2614 },
2615 scan_queue: job.scan_queue.clone(),
2616 });
2617 } else {
2618 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2619 new_entries.push(child_entry);
2620 };
2621 }
2622
2623 self.snapshot
2624 .lock()
2625 .populate_dir(job.path.clone(), new_entries, new_ignore);
2626 for new_job in new_jobs {
2627 job.scan_queue.send(new_job).await.unwrap();
2628 }
2629
2630 Ok(())
2631 }
2632
2633 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2634 let mut snapshot = self.snapshot();
2635 snapshot.scan_id += 1;
2636
2637 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2638 abs_path
2639 } else {
2640 return false;
2641 };
2642 let root_char_bag = snapshot.root_char_bag;
2643 let next_entry_id = snapshot.next_entry_id.clone();
2644
2645 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2646 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2647
2648 for event in &events {
2649 match event.path.strip_prefix(&root_abs_path) {
2650 Ok(path) => snapshot.remove_path(&path),
2651 Err(_) => {
2652 log::error!(
2653 "unexpected event {:?} for root path {:?}",
2654 event.path,
2655 root_abs_path
2656 );
2657 continue;
2658 }
2659 }
2660 }
2661
2662 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2663 for event in events {
2664 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2665 Ok(path) => Arc::from(path.to_path_buf()),
2666 Err(_) => {
2667 log::error!(
2668 "unexpected event {:?} for root path {:?}",
2669 event.path,
2670 root_abs_path
2671 );
2672 continue;
2673 }
2674 };
2675
2676 match self.fs.metadata(&event.path).await {
2677 Ok(Some(metadata)) => {
2678 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2679 let mut fs_entry = Entry::new(
2680 path.clone(),
2681 &metadata,
2682 snapshot.next_entry_id.as_ref(),
2683 snapshot.root_char_bag,
2684 );
2685 fs_entry.is_ignored = ignore_stack.is_all();
2686 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2687 if metadata.is_dir {
2688 scan_queue_tx
2689 .send(ScanJob {
2690 abs_path: event.path,
2691 path,
2692 ignore_stack,
2693 scan_queue: scan_queue_tx.clone(),
2694 })
2695 .await
2696 .unwrap();
2697 }
2698 }
2699 Ok(None) => {}
2700 Err(err) => {
2701 // TODO - create a special 'error' entry in the entries tree to mark this
2702 log::error!("error reading file on event {:?}", err);
2703 }
2704 }
2705 }
2706
2707 *self.snapshot.lock() = snapshot;
2708
2709 // Scan any directories that were created as part of this event batch.
2710 drop(scan_queue_tx);
2711 self.executor
2712 .scoped(|scope| {
2713 for _ in 0..self.executor.num_cpus() {
2714 scope.spawn(async {
2715 while let Ok(job) = scan_queue_rx.recv().await {
2716 if let Err(err) = self
2717 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2718 .await
2719 {
2720 log::error!("error scanning {:?}: {}", job.abs_path, err);
2721 }
2722 }
2723 });
2724 }
2725 })
2726 .await;
2727
2728 // Attempt to detect renames only over a single batch of file-system events.
2729 self.snapshot.lock().removed_entry_ids.clear();
2730
2731 self.update_ignore_statuses().await;
2732 true
2733 }
2734
2735 async fn update_ignore_statuses(&self) {
2736 let mut snapshot = self.snapshot();
2737
2738 let mut ignores_to_update = Vec::new();
2739 let mut ignores_to_delete = Vec::new();
2740 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2741 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2742 ignores_to_update.push(parent_path.clone());
2743 }
2744
2745 let ignore_path = parent_path.join(&*GITIGNORE);
2746 if snapshot.entry_for_path(ignore_path).is_none() {
2747 ignores_to_delete.push(parent_path.clone());
2748 }
2749 }
2750
2751 for parent_path in ignores_to_delete {
2752 snapshot.ignores.remove(&parent_path);
2753 self.snapshot.lock().ignores.remove(&parent_path);
2754 }
2755
2756 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2757 ignores_to_update.sort_unstable();
2758 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2759 while let Some(parent_path) = ignores_to_update.next() {
2760 while ignores_to_update
2761 .peek()
2762 .map_or(false, |p| p.starts_with(&parent_path))
2763 {
2764 ignores_to_update.next().unwrap();
2765 }
2766
2767 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2768 ignore_queue_tx
2769 .send(UpdateIgnoreStatusJob {
2770 path: parent_path,
2771 ignore_stack,
2772 ignore_queue: ignore_queue_tx.clone(),
2773 })
2774 .await
2775 .unwrap();
2776 }
2777 drop(ignore_queue_tx);
2778
2779 self.executor
2780 .scoped(|scope| {
2781 for _ in 0..self.executor.num_cpus() {
2782 scope.spawn(async {
2783 while let Ok(job) = ignore_queue_rx.recv().await {
2784 self.update_ignore_status(job, &snapshot).await;
2785 }
2786 });
2787 }
2788 })
2789 .await;
2790 }
2791
2792 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2793 let mut ignore_stack = job.ignore_stack;
2794 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2795 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2796 }
2797
2798 let mut entries_by_id_edits = Vec::new();
2799 let mut entries_by_path_edits = Vec::new();
2800 for mut entry in snapshot.child_entries(&job.path).cloned() {
2801 let was_ignored = entry.is_ignored;
2802 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2803 if entry.is_dir() {
2804 let child_ignore_stack = if entry.is_ignored {
2805 IgnoreStack::all()
2806 } else {
2807 ignore_stack.clone()
2808 };
2809 job.ignore_queue
2810 .send(UpdateIgnoreStatusJob {
2811 path: entry.path.clone(),
2812 ignore_stack: child_ignore_stack,
2813 ignore_queue: job.ignore_queue.clone(),
2814 })
2815 .await
2816 .unwrap();
2817 }
2818
2819 if entry.is_ignored != was_ignored {
2820 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2821 path_entry.scan_id = snapshot.scan_id;
2822 path_entry.is_ignored = entry.is_ignored;
2823 entries_by_id_edits.push(Edit::Insert(path_entry));
2824 entries_by_path_edits.push(Edit::Insert(entry));
2825 }
2826 }
2827
2828 let mut snapshot = self.snapshot.lock();
2829 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2830 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2831 }
2832}
2833
2834async fn refresh_entry(
2835 fs: &dyn Fs,
2836 snapshot: &Mutex<Snapshot>,
2837 path: Arc<Path>,
2838 abs_path: &Path,
2839) -> Result<Entry> {
2840 let root_char_bag;
2841 let next_entry_id;
2842 {
2843 let snapshot = snapshot.lock();
2844 root_char_bag = snapshot.root_char_bag;
2845 next_entry_id = snapshot.next_entry_id.clone();
2846 }
2847 let entry = Entry::new(
2848 path,
2849 &fs.metadata(abs_path)
2850 .await?
2851 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2852 &next_entry_id,
2853 root_char_bag,
2854 );
2855 Ok(snapshot.lock().insert_entry(entry, fs))
2856}
2857
2858fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2859 let mut result = root_char_bag;
2860 result.extend(
2861 path.to_string_lossy()
2862 .chars()
2863 .map(|c| c.to_ascii_lowercase()),
2864 );
2865 result
2866}
2867
2868struct ScanJob {
2869 abs_path: PathBuf,
2870 path: Arc<Path>,
2871 ignore_stack: Arc<IgnoreStack>,
2872 scan_queue: Sender<ScanJob>,
2873}
2874
2875struct UpdateIgnoreStatusJob {
2876 path: Arc<Path>,
2877 ignore_stack: Arc<IgnoreStack>,
2878 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2879}
2880
2881pub trait WorktreeHandle {
2882 #[cfg(test)]
2883 fn flush_fs_events<'a>(
2884 &self,
2885 cx: &'a gpui::TestAppContext,
2886 ) -> futures::future::LocalBoxFuture<'a, ()>;
2887}
2888
2889impl WorktreeHandle for ModelHandle<Worktree> {
2890 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2891 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2892 // extra directory scans, and emit extra scan-state notifications.
2893 //
2894 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2895 // to ensure that all redundant FS events have already been processed.
2896 #[cfg(test)]
2897 fn flush_fs_events<'a>(
2898 &self,
2899 cx: &'a gpui::TestAppContext,
2900 ) -> futures::future::LocalBoxFuture<'a, ()> {
2901 use smol::future::FutureExt;
2902
2903 let filename = "fs-event-sentinel";
2904 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2905 let tree = self.clone();
2906 async move {
2907 std::fs::write(root_path.join(filename), "").unwrap();
2908 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2909 .await;
2910
2911 std::fs::remove_file(root_path.join(filename)).unwrap();
2912 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2913 .await;
2914
2915 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2916 .await;
2917 }
2918 .boxed_local()
2919 }
2920}
2921
2922#[derive(Clone, Debug)]
2923struct TraversalProgress<'a> {
2924 max_path: &'a Path,
2925 count: usize,
2926 visible_count: usize,
2927 file_count: usize,
2928 visible_file_count: usize,
2929}
2930
2931impl<'a> TraversalProgress<'a> {
2932 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2933 match (include_ignored, include_dirs) {
2934 (true, true) => self.count,
2935 (true, false) => self.file_count,
2936 (false, true) => self.visible_count,
2937 (false, false) => self.visible_file_count,
2938 }
2939 }
2940}
2941
2942impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2943 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2944 self.max_path = summary.max_path.as_ref();
2945 self.count += summary.count;
2946 self.visible_count += summary.visible_count;
2947 self.file_count += summary.file_count;
2948 self.visible_file_count += summary.visible_file_count;
2949 }
2950}
2951
2952impl<'a> Default for TraversalProgress<'a> {
2953 fn default() -> Self {
2954 Self {
2955 max_path: Path::new(""),
2956 count: 0,
2957 visible_count: 0,
2958 file_count: 0,
2959 visible_file_count: 0,
2960 }
2961 }
2962}
2963
2964pub struct Traversal<'a> {
2965 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2966 include_ignored: bool,
2967 include_dirs: bool,
2968}
2969
2970impl<'a> Traversal<'a> {
2971 pub fn advance(&mut self) -> bool {
2972 self.advance_to_offset(self.offset() + 1)
2973 }
2974
2975 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2976 self.cursor.seek_forward(
2977 &TraversalTarget::Count {
2978 count: offset,
2979 include_dirs: self.include_dirs,
2980 include_ignored: self.include_ignored,
2981 },
2982 Bias::Right,
2983 &(),
2984 )
2985 }
2986
2987 pub fn advance_to_sibling(&mut self) -> bool {
2988 while let Some(entry) = self.cursor.item() {
2989 self.cursor.seek_forward(
2990 &TraversalTarget::PathSuccessor(&entry.path),
2991 Bias::Left,
2992 &(),
2993 );
2994 if let Some(entry) = self.cursor.item() {
2995 if (self.include_dirs || !entry.is_dir())
2996 && (self.include_ignored || !entry.is_ignored)
2997 {
2998 return true;
2999 }
3000 }
3001 }
3002 false
3003 }
3004
3005 pub fn entry(&self) -> Option<&'a Entry> {
3006 self.cursor.item()
3007 }
3008
3009 pub fn offset(&self) -> usize {
3010 self.cursor
3011 .start()
3012 .count(self.include_dirs, self.include_ignored)
3013 }
3014}
3015
3016impl<'a> Iterator for Traversal<'a> {
3017 type Item = &'a Entry;
3018
3019 fn next(&mut self) -> Option<Self::Item> {
3020 if let Some(item) = self.entry() {
3021 self.advance();
3022 Some(item)
3023 } else {
3024 None
3025 }
3026 }
3027}
3028
3029#[derive(Debug)]
3030enum TraversalTarget<'a> {
3031 Path(&'a Path),
3032 PathSuccessor(&'a Path),
3033 Count {
3034 count: usize,
3035 include_ignored: bool,
3036 include_dirs: bool,
3037 },
3038}
3039
3040impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
3041 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
3042 match self {
3043 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
3044 TraversalTarget::PathSuccessor(path) => {
3045 if !cursor_location.max_path.starts_with(path) {
3046 Ordering::Equal
3047 } else {
3048 Ordering::Greater
3049 }
3050 }
3051 TraversalTarget::Count {
3052 count,
3053 include_dirs,
3054 include_ignored,
3055 } => Ord::cmp(
3056 count,
3057 &cursor_location.count(*include_dirs, *include_ignored),
3058 ),
3059 }
3060 }
3061}
3062
3063struct ChildEntriesIter<'a> {
3064 parent_path: &'a Path,
3065 traversal: Traversal<'a>,
3066}
3067
3068impl<'a> Iterator for ChildEntriesIter<'a> {
3069 type Item = &'a Entry;
3070
3071 fn next(&mut self) -> Option<Self::Item> {
3072 if let Some(item) = self.traversal.entry() {
3073 if item.path.starts_with(&self.parent_path) {
3074 self.traversal.advance_to_sibling();
3075 return Some(item);
3076 }
3077 }
3078 None
3079 }
3080}
3081
3082impl<'a> From<&'a Entry> for proto::Entry {
3083 fn from(entry: &'a Entry) -> Self {
3084 Self {
3085 id: entry.id as u64,
3086 is_dir: entry.is_dir(),
3087 path: entry.path.to_string_lossy().to_string(),
3088 inode: entry.inode,
3089 mtime: Some(entry.mtime.into()),
3090 is_symlink: entry.is_symlink,
3091 is_ignored: entry.is_ignored,
3092 }
3093 }
3094}
3095
3096impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
3097 type Error = anyhow::Error;
3098
3099 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
3100 if let Some(mtime) = entry.mtime {
3101 let kind = if entry.is_dir {
3102 EntryKind::Dir
3103 } else {
3104 let mut char_bag = root_char_bag.clone();
3105 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
3106 EntryKind::File(char_bag)
3107 };
3108 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
3109 Ok(Entry {
3110 id: entry.id as usize,
3111 kind,
3112 path: path.clone(),
3113 inode: entry.inode,
3114 mtime: mtime.into(),
3115 is_symlink: entry.is_symlink,
3116 is_ignored: entry.is_ignored,
3117 })
3118 } else {
3119 Err(anyhow!(
3120 "missing mtime in remote worktree entry {:?}",
3121 entry.path
3122 ))
3123 }
3124 }
3125}
3126
3127trait ToPointUtf16 {
3128 fn to_point_utf16(self) -> PointUtf16;
3129}
3130
3131impl ToPointUtf16 for lsp::Position {
3132 fn to_point_utf16(self) -> PointUtf16 {
3133 PointUtf16::new(self.line, self.character)
3134 }
3135}
3136
3137fn diagnostic_ranges<'a>(
3138 diagnostic: &'a lsp::Diagnostic,
3139 abs_path: &'a Path,
3140) -> impl 'a + Iterator<Item = Range<PointUtf16>> {
3141 diagnostic
3142 .related_information
3143 .iter()
3144 .flatten()
3145 .filter_map(move |info| {
3146 if info.location.uri.to_file_path().ok()? == abs_path {
3147 let info_start = PointUtf16::new(
3148 info.location.range.start.line,
3149 info.location.range.start.character,
3150 );
3151 let info_end = PointUtf16::new(
3152 info.location.range.end.line,
3153 info.location.range.end.character,
3154 );
3155 Some(info_start..info_end)
3156 } else {
3157 None
3158 }
3159 })
3160 .chain(Some(
3161 diagnostic.range.start.to_point_utf16()..diagnostic.range.end.to_point_utf16(),
3162 ))
3163}
3164
3165#[cfg(test)]
3166mod tests {
3167 use super::*;
3168 use crate::fs::FakeFs;
3169 use anyhow::Result;
3170 use client::test::{FakeHttpClient, FakeServer};
3171 use fs::RealFs;
3172 use language::{tree_sitter_rust, DiagnosticEntry, LanguageServerConfig};
3173 use language::{Diagnostic, LanguageConfig};
3174 use lsp::Url;
3175 use rand::prelude::*;
3176 use serde_json::json;
3177 use std::{cell::RefCell, rc::Rc};
3178 use std::{
3179 env,
3180 fmt::Write,
3181 time::{SystemTime, UNIX_EPOCH},
3182 };
3183 use text::Point;
3184 use unindent::Unindent as _;
3185 use util::test::temp_tree;
3186
3187 #[gpui::test]
3188 async fn test_traversal(mut cx: gpui::TestAppContext) {
3189 let fs = FakeFs::new();
3190 fs.insert_tree(
3191 "/root",
3192 json!({
3193 ".gitignore": "a/b\n",
3194 "a": {
3195 "b": "",
3196 "c": "",
3197 }
3198 }),
3199 )
3200 .await;
3201
3202 let client = Client::new();
3203 let http_client = FakeHttpClient::with_404_response();
3204 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3205
3206 let tree = Worktree::open_local(
3207 client,
3208 user_store,
3209 Arc::from(Path::new("/root")),
3210 Arc::new(fs),
3211 Default::default(),
3212 &mut cx.to_async(),
3213 )
3214 .await
3215 .unwrap();
3216 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3217 .await;
3218
3219 tree.read_with(&cx, |tree, _| {
3220 assert_eq!(
3221 tree.entries(false)
3222 .map(|entry| entry.path.as_ref())
3223 .collect::<Vec<_>>(),
3224 vec![
3225 Path::new(""),
3226 Path::new(".gitignore"),
3227 Path::new("a"),
3228 Path::new("a/c"),
3229 ]
3230 );
3231 })
3232 }
3233
3234 #[gpui::test]
3235 async fn test_save_file(mut cx: gpui::TestAppContext) {
3236 let dir = temp_tree(json!({
3237 "file1": "the old contents",
3238 }));
3239
3240 let client = Client::new();
3241 let http_client = FakeHttpClient::with_404_response();
3242 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3243
3244 let tree = Worktree::open_local(
3245 client,
3246 user_store,
3247 dir.path(),
3248 Arc::new(RealFs),
3249 Default::default(),
3250 &mut cx.to_async(),
3251 )
3252 .await
3253 .unwrap();
3254 let buffer = tree
3255 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3256 .await
3257 .unwrap();
3258 let save = buffer.update(&mut cx, |buffer, cx| {
3259 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3260 buffer.save(cx).unwrap()
3261 });
3262 save.await.unwrap();
3263
3264 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3265 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3266 }
3267
3268 #[gpui::test]
3269 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3270 let dir = temp_tree(json!({
3271 "file1": "the old contents",
3272 }));
3273 let file_path = dir.path().join("file1");
3274
3275 let client = Client::new();
3276 let http_client = FakeHttpClient::with_404_response();
3277 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3278
3279 let tree = Worktree::open_local(
3280 client,
3281 user_store,
3282 file_path.clone(),
3283 Arc::new(RealFs),
3284 Default::default(),
3285 &mut cx.to_async(),
3286 )
3287 .await
3288 .unwrap();
3289 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3290 .await;
3291 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3292
3293 let buffer = tree
3294 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3295 .await
3296 .unwrap();
3297 let save = buffer.update(&mut cx, |buffer, cx| {
3298 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3299 buffer.save(cx).unwrap()
3300 });
3301 save.await.unwrap();
3302
3303 let new_text = std::fs::read_to_string(file_path).unwrap();
3304 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3305 }
3306
3307 #[gpui::test]
3308 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3309 let dir = temp_tree(json!({
3310 "a": {
3311 "file1": "",
3312 "file2": "",
3313 "file3": "",
3314 },
3315 "b": {
3316 "c": {
3317 "file4": "",
3318 "file5": "",
3319 }
3320 }
3321 }));
3322
3323 let user_id = 5;
3324 let mut client = Client::new();
3325 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3326 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3327 let tree = Worktree::open_local(
3328 client,
3329 user_store.clone(),
3330 dir.path(),
3331 Arc::new(RealFs),
3332 Default::default(),
3333 &mut cx.to_async(),
3334 )
3335 .await
3336 .unwrap();
3337
3338 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3339 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3340 async move { buffer.await.unwrap() }
3341 };
3342 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3343 tree.read_with(cx, |tree, _| {
3344 tree.entry_for_path(path)
3345 .expect(&format!("no entry for path {}", path))
3346 .id
3347 })
3348 };
3349
3350 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3351 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3352 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3353 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3354
3355 let file2_id = id_for_path("a/file2", &cx);
3356 let file3_id = id_for_path("a/file3", &cx);
3357 let file4_id = id_for_path("b/c/file4", &cx);
3358
3359 // Wait for the initial scan.
3360 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3361 .await;
3362
3363 // Create a remote copy of this worktree.
3364 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3365 let worktree_id = 1;
3366 let share_request = tree.update(&mut cx, |tree, cx| {
3367 tree.as_local().unwrap().share_request(cx)
3368 });
3369 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3370 server
3371 .respond(
3372 open_worktree.receipt(),
3373 proto::OpenWorktreeResponse { worktree_id: 1 },
3374 )
3375 .await;
3376
3377 let remote = Worktree::remote(
3378 proto::JoinWorktreeResponse {
3379 worktree: share_request.await.unwrap().worktree,
3380 replica_id: 1,
3381 collaborators: Vec::new(),
3382 },
3383 Client::new(),
3384 user_store,
3385 Default::default(),
3386 &mut cx.to_async(),
3387 )
3388 .await
3389 .unwrap();
3390
3391 cx.read(|cx| {
3392 assert!(!buffer2.read(cx).is_dirty());
3393 assert!(!buffer3.read(cx).is_dirty());
3394 assert!(!buffer4.read(cx).is_dirty());
3395 assert!(!buffer5.read(cx).is_dirty());
3396 });
3397
3398 // Rename and delete files and directories.
3399 tree.flush_fs_events(&cx).await;
3400 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3401 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3402 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3403 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3404 tree.flush_fs_events(&cx).await;
3405
3406 let expected_paths = vec![
3407 "a",
3408 "a/file1",
3409 "a/file2.new",
3410 "b",
3411 "d",
3412 "d/file3",
3413 "d/file4",
3414 ];
3415
3416 cx.read(|app| {
3417 assert_eq!(
3418 tree.read(app)
3419 .paths()
3420 .map(|p| p.to_str().unwrap())
3421 .collect::<Vec<_>>(),
3422 expected_paths
3423 );
3424
3425 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3426 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3427 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3428
3429 assert_eq!(
3430 buffer2.read(app).file().unwrap().path().as_ref(),
3431 Path::new("a/file2.new")
3432 );
3433 assert_eq!(
3434 buffer3.read(app).file().unwrap().path().as_ref(),
3435 Path::new("d/file3")
3436 );
3437 assert_eq!(
3438 buffer4.read(app).file().unwrap().path().as_ref(),
3439 Path::new("d/file4")
3440 );
3441 assert_eq!(
3442 buffer5.read(app).file().unwrap().path().as_ref(),
3443 Path::new("b/c/file5")
3444 );
3445
3446 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3447 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3448 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3449 assert!(buffer5.read(app).file().unwrap().is_deleted());
3450 });
3451
3452 // Update the remote worktree. Check that it becomes consistent with the
3453 // local worktree.
3454 remote.update(&mut cx, |remote, cx| {
3455 let update_message =
3456 tree.read(cx)
3457 .snapshot()
3458 .build_update(&initial_snapshot, worktree_id, true);
3459 remote
3460 .as_remote_mut()
3461 .unwrap()
3462 .snapshot
3463 .apply_update(update_message)
3464 .unwrap();
3465
3466 assert_eq!(
3467 remote
3468 .paths()
3469 .map(|p| p.to_str().unwrap())
3470 .collect::<Vec<_>>(),
3471 expected_paths
3472 );
3473 });
3474 }
3475
3476 #[gpui::test]
3477 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3478 let dir = temp_tree(json!({
3479 ".git": {},
3480 ".gitignore": "ignored-dir\n",
3481 "tracked-dir": {
3482 "tracked-file1": "tracked contents",
3483 },
3484 "ignored-dir": {
3485 "ignored-file1": "ignored contents",
3486 }
3487 }));
3488
3489 let client = Client::new();
3490 let http_client = FakeHttpClient::with_404_response();
3491 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3492
3493 let tree = Worktree::open_local(
3494 client,
3495 user_store,
3496 dir.path(),
3497 Arc::new(RealFs),
3498 Default::default(),
3499 &mut cx.to_async(),
3500 )
3501 .await
3502 .unwrap();
3503 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3504 .await;
3505 tree.flush_fs_events(&cx).await;
3506 cx.read(|cx| {
3507 let tree = tree.read(cx);
3508 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3509 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3510 assert_eq!(tracked.is_ignored, false);
3511 assert_eq!(ignored.is_ignored, true);
3512 });
3513
3514 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3515 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3516 tree.flush_fs_events(&cx).await;
3517 cx.read(|cx| {
3518 let tree = tree.read(cx);
3519 let dot_git = tree.entry_for_path(".git").unwrap();
3520 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3521 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3522 assert_eq!(tracked.is_ignored, false);
3523 assert_eq!(ignored.is_ignored, true);
3524 assert_eq!(dot_git.is_ignored, true);
3525 });
3526 }
3527
3528 #[gpui::test]
3529 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3530 let user_id = 100;
3531 let mut client = Client::new();
3532 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3533 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3534
3535 let fs = Arc::new(FakeFs::new());
3536 fs.insert_tree(
3537 "/path",
3538 json!({
3539 "to": {
3540 "the-dir": {
3541 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3542 "a.txt": "a-contents",
3543 },
3544 },
3545 }),
3546 )
3547 .await;
3548
3549 let worktree = Worktree::open_local(
3550 client.clone(),
3551 user_store,
3552 "/path/to/the-dir".as_ref(),
3553 fs,
3554 Default::default(),
3555 &mut cx.to_async(),
3556 )
3557 .await
3558 .unwrap();
3559
3560 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3561 assert_eq!(
3562 open_worktree.payload,
3563 proto::OpenWorktree {
3564 root_name: "the-dir".to_string(),
3565 authorized_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3566 }
3567 );
3568
3569 server
3570 .respond(
3571 open_worktree.receipt(),
3572 proto::OpenWorktreeResponse { worktree_id: 5 },
3573 )
3574 .await;
3575 let remote_id = worktree
3576 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3577 .await;
3578 assert_eq!(remote_id, Some(5));
3579
3580 cx.update(move |_| drop(worktree));
3581 server.receive::<proto::CloseWorktree>().await.unwrap();
3582 }
3583
3584 #[gpui::test]
3585 async fn test_buffer_deduping(mut cx: gpui::TestAppContext) {
3586 let user_id = 100;
3587 let mut client = Client::new();
3588 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3589 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3590
3591 let fs = Arc::new(FakeFs::new());
3592 fs.insert_tree(
3593 "/the-dir",
3594 json!({
3595 "a.txt": "a-contents",
3596 "b.txt": "b-contents",
3597 }),
3598 )
3599 .await;
3600
3601 let worktree = Worktree::open_local(
3602 client.clone(),
3603 user_store,
3604 "/the-dir".as_ref(),
3605 fs,
3606 Default::default(),
3607 &mut cx.to_async(),
3608 )
3609 .await
3610 .unwrap();
3611
3612 // Spawn multiple tasks to open paths, repeating some paths.
3613 let (buffer_a_1, buffer_b, buffer_a_2) = worktree.update(&mut cx, |worktree, cx| {
3614 (
3615 worktree.open_buffer("a.txt", cx),
3616 worktree.open_buffer("b.txt", cx),
3617 worktree.open_buffer("a.txt", cx),
3618 )
3619 });
3620
3621 let buffer_a_1 = buffer_a_1.await.unwrap();
3622 let buffer_a_2 = buffer_a_2.await.unwrap();
3623 let buffer_b = buffer_b.await.unwrap();
3624 assert_eq!(buffer_a_1.read_with(&cx, |b, _| b.text()), "a-contents");
3625 assert_eq!(buffer_b.read_with(&cx, |b, _| b.text()), "b-contents");
3626
3627 // There is only one buffer per path.
3628 let buffer_a_id = buffer_a_1.id();
3629 assert_eq!(buffer_a_2.id(), buffer_a_id);
3630
3631 // Open the same path again while it is still open.
3632 drop(buffer_a_1);
3633 let buffer_a_3 = worktree
3634 .update(&mut cx, |worktree, cx| worktree.open_buffer("a.txt", cx))
3635 .await
3636 .unwrap();
3637
3638 // There's still only one buffer per path.
3639 assert_eq!(buffer_a_3.id(), buffer_a_id);
3640 }
3641
3642 #[gpui::test]
3643 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3644 use std::fs;
3645
3646 let dir = temp_tree(json!({
3647 "file1": "abc",
3648 "file2": "def",
3649 "file3": "ghi",
3650 }));
3651 let client = Client::new();
3652 let http_client = FakeHttpClient::with_404_response();
3653 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3654
3655 let tree = Worktree::open_local(
3656 client,
3657 user_store,
3658 dir.path(),
3659 Arc::new(RealFs),
3660 Default::default(),
3661 &mut cx.to_async(),
3662 )
3663 .await
3664 .unwrap();
3665 tree.flush_fs_events(&cx).await;
3666 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3667 .await;
3668
3669 let buffer1 = tree
3670 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3671 .await
3672 .unwrap();
3673 let events = Rc::new(RefCell::new(Vec::new()));
3674
3675 // initially, the buffer isn't dirty.
3676 buffer1.update(&mut cx, |buffer, cx| {
3677 cx.subscribe(&buffer1, {
3678 let events = events.clone();
3679 move |_, _, event, _| events.borrow_mut().push(event.clone())
3680 })
3681 .detach();
3682
3683 assert!(!buffer.is_dirty());
3684 assert!(events.borrow().is_empty());
3685
3686 buffer.edit(vec![1..2], "", cx);
3687 });
3688
3689 // after the first edit, the buffer is dirty, and emits a dirtied event.
3690 buffer1.update(&mut cx, |buffer, cx| {
3691 assert!(buffer.text() == "ac");
3692 assert!(buffer.is_dirty());
3693 assert_eq!(
3694 *events.borrow(),
3695 &[language::Event::Edited, language::Event::Dirtied]
3696 );
3697 events.borrow_mut().clear();
3698 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3699 });
3700
3701 // after saving, the buffer is not dirty, and emits a saved event.
3702 buffer1.update(&mut cx, |buffer, cx| {
3703 assert!(!buffer.is_dirty());
3704 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3705 events.borrow_mut().clear();
3706
3707 buffer.edit(vec![1..1], "B", cx);
3708 buffer.edit(vec![2..2], "D", cx);
3709 });
3710
3711 // after editing again, the buffer is dirty, and emits another dirty event.
3712 buffer1.update(&mut cx, |buffer, cx| {
3713 assert!(buffer.text() == "aBDc");
3714 assert!(buffer.is_dirty());
3715 assert_eq!(
3716 *events.borrow(),
3717 &[
3718 language::Event::Edited,
3719 language::Event::Dirtied,
3720 language::Event::Edited,
3721 ],
3722 );
3723 events.borrow_mut().clear();
3724
3725 // TODO - currently, after restoring the buffer to its
3726 // previously-saved state, the is still considered dirty.
3727 buffer.edit([1..3], "", cx);
3728 assert!(buffer.text() == "ac");
3729 assert!(buffer.is_dirty());
3730 });
3731
3732 assert_eq!(*events.borrow(), &[language::Event::Edited]);
3733
3734 // When a file is deleted, the buffer is considered dirty.
3735 let events = Rc::new(RefCell::new(Vec::new()));
3736 let buffer2 = tree
3737 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3738 .await
3739 .unwrap();
3740 buffer2.update(&mut cx, |_, cx| {
3741 cx.subscribe(&buffer2, {
3742 let events = events.clone();
3743 move |_, _, event, _| events.borrow_mut().push(event.clone())
3744 })
3745 .detach();
3746 });
3747
3748 fs::remove_file(dir.path().join("file2")).unwrap();
3749 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3750 assert_eq!(
3751 *events.borrow(),
3752 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3753 );
3754
3755 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3756 let events = Rc::new(RefCell::new(Vec::new()));
3757 let buffer3 = tree
3758 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3759 .await
3760 .unwrap();
3761 buffer3.update(&mut cx, |_, cx| {
3762 cx.subscribe(&buffer3, {
3763 let events = events.clone();
3764 move |_, _, event, _| events.borrow_mut().push(event.clone())
3765 })
3766 .detach();
3767 });
3768
3769 tree.flush_fs_events(&cx).await;
3770 buffer3.update(&mut cx, |buffer, cx| {
3771 buffer.edit(Some(0..0), "x", cx);
3772 });
3773 events.borrow_mut().clear();
3774 fs::remove_file(dir.path().join("file3")).unwrap();
3775 buffer3
3776 .condition(&cx, |_, _| !events.borrow().is_empty())
3777 .await;
3778 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3779 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3780 }
3781
3782 #[gpui::test]
3783 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3784 use std::fs;
3785
3786 let initial_contents = "aaa\nbbbbb\nc\n";
3787 let dir = temp_tree(json!({ "the-file": initial_contents }));
3788 let client = Client::new();
3789 let http_client = FakeHttpClient::with_404_response();
3790 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3791
3792 let tree = Worktree::open_local(
3793 client,
3794 user_store,
3795 dir.path(),
3796 Arc::new(RealFs),
3797 Default::default(),
3798 &mut cx.to_async(),
3799 )
3800 .await
3801 .unwrap();
3802 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3803 .await;
3804
3805 let abs_path = dir.path().join("the-file");
3806 let buffer = tree
3807 .update(&mut cx, |tree, cx| {
3808 tree.open_buffer(Path::new("the-file"), cx)
3809 })
3810 .await
3811 .unwrap();
3812
3813 // TODO
3814 // Add a cursor on each row.
3815 // let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3816 // assert!(!buffer.is_dirty());
3817 // buffer.add_selection_set(
3818 // &(0..3)
3819 // .map(|row| Selection {
3820 // id: row as usize,
3821 // start: Point::new(row, 1),
3822 // end: Point::new(row, 1),
3823 // reversed: false,
3824 // goal: SelectionGoal::None,
3825 // })
3826 // .collect::<Vec<_>>(),
3827 // cx,
3828 // )
3829 // });
3830
3831 // Change the file on disk, adding two new lines of text, and removing
3832 // one line.
3833 buffer.read_with(&cx, |buffer, _| {
3834 assert!(!buffer.is_dirty());
3835 assert!(!buffer.has_conflict());
3836 });
3837 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3838 fs::write(&abs_path, new_contents).unwrap();
3839
3840 // Because the buffer was not modified, it is reloaded from disk. Its
3841 // contents are edited according to the diff between the old and new
3842 // file contents.
3843 buffer
3844 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3845 .await;
3846
3847 buffer.update(&mut cx, |buffer, _| {
3848 assert_eq!(buffer.text(), new_contents);
3849 assert!(!buffer.is_dirty());
3850 assert!(!buffer.has_conflict());
3851
3852 // TODO
3853 // let cursor_positions = buffer
3854 // .selection_set(selection_set_id)
3855 // .unwrap()
3856 // .selections::<Point>(&*buffer)
3857 // .map(|selection| {
3858 // assert_eq!(selection.start, selection.end);
3859 // selection.start
3860 // })
3861 // .collect::<Vec<_>>();
3862 // assert_eq!(
3863 // cursor_positions,
3864 // [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3865 // );
3866 });
3867
3868 // Modify the buffer
3869 buffer.update(&mut cx, |buffer, cx| {
3870 buffer.edit(vec![0..0], " ", cx);
3871 assert!(buffer.is_dirty());
3872 assert!(!buffer.has_conflict());
3873 });
3874
3875 // Change the file on disk again, adding blank lines to the beginning.
3876 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3877
3878 // Because the buffer is modified, it doesn't reload from disk, but is
3879 // marked as having a conflict.
3880 buffer
3881 .condition(&cx, |buffer, _| buffer.has_conflict())
3882 .await;
3883 }
3884
3885 #[gpui::test]
3886 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3887 let (language_server_config, mut fake_server) =
3888 LanguageServerConfig::fake(cx.background()).await;
3889 let mut languages = LanguageRegistry::new();
3890 languages.add(Arc::new(Language::new(
3891 LanguageConfig {
3892 name: "Rust".to_string(),
3893 path_suffixes: vec!["rs".to_string()],
3894 language_server: Some(language_server_config),
3895 ..Default::default()
3896 },
3897 Some(tree_sitter_rust::language()),
3898 )));
3899
3900 let dir = temp_tree(json!({
3901 "a.rs": "fn a() { A }",
3902 "b.rs": "const y: i32 = 1",
3903 }));
3904
3905 let client = Client::new();
3906 let http_client = FakeHttpClient::with_404_response();
3907 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3908
3909 let tree = Worktree::open_local(
3910 client,
3911 user_store,
3912 dir.path(),
3913 Arc::new(RealFs),
3914 Arc::new(languages),
3915 &mut cx.to_async(),
3916 )
3917 .await
3918 .unwrap();
3919 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3920 .await;
3921
3922 // Cause worktree to start the fake language server
3923 let _buffer = tree
3924 .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
3925 .await
3926 .unwrap();
3927
3928 fake_server
3929 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3930 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3931 version: None,
3932 diagnostics: vec![lsp::Diagnostic {
3933 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3934 severity: Some(lsp::DiagnosticSeverity::ERROR),
3935 message: "undefined variable 'A'".to_string(),
3936 ..Default::default()
3937 }],
3938 })
3939 .await;
3940
3941 let buffer = tree
3942 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3943 .await
3944 .unwrap();
3945
3946 buffer.read_with(&cx, |buffer, _| {
3947 let diagnostics = buffer
3948 .snapshot()
3949 .diagnostics_in_range::<_, Point>(0..buffer.len())
3950 .collect::<Vec<_>>();
3951 assert_eq!(
3952 diagnostics,
3953 &[DiagnosticEntry {
3954 range: Point::new(0, 9)..Point::new(0, 10),
3955 diagnostic: Diagnostic {
3956 severity: lsp::DiagnosticSeverity::ERROR,
3957 message: "undefined variable 'A'".to_string(),
3958 group_id: 0,
3959 is_primary: true,
3960 ..Default::default()
3961 }
3962 }]
3963 )
3964 });
3965 }
3966
3967 #[gpui::test]
3968 async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) {
3969 let fs = Arc::new(FakeFs::new());
3970 let client = Client::new();
3971 let http_client = FakeHttpClient::with_404_response();
3972 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3973
3974 fs.insert_tree(
3975 "/the-dir",
3976 json!({
3977 "a.rs": "
3978 fn foo(mut v: Vec<usize>) {
3979 for x in &v {
3980 v.push(1);
3981 }
3982 }
3983 "
3984 .unindent(),
3985 }),
3986 )
3987 .await;
3988
3989 let worktree = Worktree::open_local(
3990 client.clone(),
3991 user_store,
3992 "/the-dir".as_ref(),
3993 fs,
3994 Default::default(),
3995 &mut cx.to_async(),
3996 )
3997 .await
3998 .unwrap();
3999
4000 let buffer = worktree
4001 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
4002 .await
4003 .unwrap();
4004
4005 let buffer_uri = Url::from_file_path("/the-dir/a.rs").unwrap();
4006 let message = lsp::PublishDiagnosticsParams {
4007 uri: buffer_uri.clone(),
4008 diagnostics: vec![
4009 lsp::Diagnostic {
4010 range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
4011 severity: Some(DiagnosticSeverity::WARNING),
4012 message: "error 1".to_string(),
4013 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4014 location: lsp::Location {
4015 uri: buffer_uri.clone(),
4016 range: lsp::Range::new(
4017 lsp::Position::new(1, 8),
4018 lsp::Position::new(1, 9),
4019 ),
4020 },
4021 message: "error 1 hint 1".to_string(),
4022 }]),
4023 ..Default::default()
4024 },
4025 lsp::Diagnostic {
4026 range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
4027 severity: Some(DiagnosticSeverity::HINT),
4028 message: "error 1 hint 1".to_string(),
4029 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4030 location: lsp::Location {
4031 uri: buffer_uri.clone(),
4032 range: lsp::Range::new(
4033 lsp::Position::new(1, 8),
4034 lsp::Position::new(1, 9),
4035 ),
4036 },
4037 message: "original diagnostic".to_string(),
4038 }]),
4039 ..Default::default()
4040 },
4041 lsp::Diagnostic {
4042 range: lsp::Range::new(lsp::Position::new(2, 8), lsp::Position::new(2, 17)),
4043 severity: Some(DiagnosticSeverity::ERROR),
4044 message: "error 2".to_string(),
4045 related_information: Some(vec![
4046 lsp::DiagnosticRelatedInformation {
4047 location: lsp::Location {
4048 uri: buffer_uri.clone(),
4049 range: lsp::Range::new(
4050 lsp::Position::new(1, 13),
4051 lsp::Position::new(1, 15),
4052 ),
4053 },
4054 message: "error 2 hint 1".to_string(),
4055 },
4056 lsp::DiagnosticRelatedInformation {
4057 location: lsp::Location {
4058 uri: buffer_uri.clone(),
4059 range: lsp::Range::new(
4060 lsp::Position::new(1, 13),
4061 lsp::Position::new(1, 15),
4062 ),
4063 },
4064 message: "error 2 hint 2".to_string(),
4065 },
4066 ]),
4067 ..Default::default()
4068 },
4069 lsp::Diagnostic {
4070 range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
4071 severity: Some(DiagnosticSeverity::HINT),
4072 message: "error 2 hint 1".to_string(),
4073 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4074 location: lsp::Location {
4075 uri: buffer_uri.clone(),
4076 range: lsp::Range::new(
4077 lsp::Position::new(2, 8),
4078 lsp::Position::new(2, 17),
4079 ),
4080 },
4081 message: "original diagnostic".to_string(),
4082 }]),
4083 ..Default::default()
4084 },
4085 lsp::Diagnostic {
4086 range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
4087 severity: Some(DiagnosticSeverity::HINT),
4088 message: "error 2 hint 2".to_string(),
4089 related_information: Some(vec![lsp::DiagnosticRelatedInformation {
4090 location: lsp::Location {
4091 uri: buffer_uri.clone(),
4092 range: lsp::Range::new(
4093 lsp::Position::new(2, 8),
4094 lsp::Position::new(2, 17),
4095 ),
4096 },
4097 message: "original diagnostic".to_string(),
4098 }]),
4099 ..Default::default()
4100 },
4101 ],
4102 version: None,
4103 };
4104
4105 worktree
4106 .update(&mut cx, |tree, cx| tree.update_diagnostics(message, cx))
4107 .unwrap();
4108 let buffer = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
4109
4110 assert_eq!(
4111 buffer
4112 .diagnostics_in_range::<_, Point>(0..buffer.len())
4113 .collect::<Vec<_>>(),
4114 &[
4115 DiagnosticEntry {
4116 range: Point::new(1, 8)..Point::new(1, 9),
4117 diagnostic: Diagnostic {
4118 severity: DiagnosticSeverity::WARNING,
4119 message: "error 1".to_string(),
4120 group_id: 0,
4121 is_primary: true,
4122 ..Default::default()
4123 }
4124 },
4125 DiagnosticEntry {
4126 range: Point::new(1, 8)..Point::new(1, 9),
4127 diagnostic: Diagnostic {
4128 severity: DiagnosticSeverity::HINT,
4129 message: "error 1 hint 1".to_string(),
4130 group_id: 0,
4131 is_primary: false,
4132 ..Default::default()
4133 }
4134 },
4135 DiagnosticEntry {
4136 range: Point::new(1, 13)..Point::new(1, 15),
4137 diagnostic: Diagnostic {
4138 severity: DiagnosticSeverity::HINT,
4139 message: "error 2 hint 1".to_string(),
4140 group_id: 1,
4141 is_primary: false,
4142 ..Default::default()
4143 }
4144 },
4145 DiagnosticEntry {
4146 range: Point::new(1, 13)..Point::new(1, 15),
4147 diagnostic: Diagnostic {
4148 severity: DiagnosticSeverity::HINT,
4149 message: "error 2 hint 2".to_string(),
4150 group_id: 1,
4151 is_primary: false,
4152 ..Default::default()
4153 }
4154 },
4155 DiagnosticEntry {
4156 range: Point::new(2, 8)..Point::new(2, 17),
4157 diagnostic: Diagnostic {
4158 severity: DiagnosticSeverity::ERROR,
4159 message: "error 2".to_string(),
4160 group_id: 1,
4161 is_primary: true,
4162 ..Default::default()
4163 }
4164 }
4165 ]
4166 );
4167
4168 assert_eq!(
4169 buffer.diagnostic_group::<Point>(0).collect::<Vec<_>>(),
4170 &[
4171 DiagnosticEntry {
4172 range: Point::new(1, 8)..Point::new(1, 9),
4173 diagnostic: Diagnostic {
4174 severity: DiagnosticSeverity::WARNING,
4175 message: "error 1".to_string(),
4176 group_id: 0,
4177 is_primary: true,
4178 ..Default::default()
4179 }
4180 },
4181 DiagnosticEntry {
4182 range: Point::new(1, 8)..Point::new(1, 9),
4183 diagnostic: Diagnostic {
4184 severity: DiagnosticSeverity::HINT,
4185 message: "error 1 hint 1".to_string(),
4186 group_id: 0,
4187 is_primary: false,
4188 ..Default::default()
4189 }
4190 },
4191 ]
4192 );
4193 assert_eq!(
4194 buffer.diagnostic_group::<Point>(1).collect::<Vec<_>>(),
4195 &[
4196 DiagnosticEntry {
4197 range: Point::new(1, 13)..Point::new(1, 15),
4198 diagnostic: Diagnostic {
4199 severity: DiagnosticSeverity::HINT,
4200 message: "error 2 hint 1".to_string(),
4201 group_id: 1,
4202 is_primary: false,
4203 ..Default::default()
4204 }
4205 },
4206 DiagnosticEntry {
4207 range: Point::new(1, 13)..Point::new(1, 15),
4208 diagnostic: Diagnostic {
4209 severity: DiagnosticSeverity::HINT,
4210 message: "error 2 hint 2".to_string(),
4211 group_id: 1,
4212 is_primary: false,
4213 ..Default::default()
4214 }
4215 },
4216 DiagnosticEntry {
4217 range: Point::new(2, 8)..Point::new(2, 17),
4218 diagnostic: Diagnostic {
4219 severity: DiagnosticSeverity::ERROR,
4220 message: "error 2".to_string(),
4221 group_id: 1,
4222 is_primary: true,
4223 ..Default::default()
4224 }
4225 }
4226 ]
4227 );
4228 }
4229
4230 #[gpui::test(iterations = 100)]
4231 fn test_random(mut rng: StdRng) {
4232 let operations = env::var("OPERATIONS")
4233 .map(|o| o.parse().unwrap())
4234 .unwrap_or(40);
4235 let initial_entries = env::var("INITIAL_ENTRIES")
4236 .map(|o| o.parse().unwrap())
4237 .unwrap_or(20);
4238
4239 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
4240 for _ in 0..initial_entries {
4241 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
4242 }
4243 log::info!("Generated initial tree");
4244
4245 let (notify_tx, _notify_rx) = smol::channel::unbounded();
4246 let fs = Arc::new(RealFs);
4247 let next_entry_id = Arc::new(AtomicUsize::new(0));
4248 let mut initial_snapshot = Snapshot {
4249 id: 0,
4250 scan_id: 0,
4251 abs_path: root_dir.path().into(),
4252 entries_by_path: Default::default(),
4253 entries_by_id: Default::default(),
4254 removed_entry_ids: Default::default(),
4255 ignores: Default::default(),
4256 root_name: Default::default(),
4257 root_char_bag: Default::default(),
4258 next_entry_id: next_entry_id.clone(),
4259 };
4260 initial_snapshot.insert_entry(
4261 Entry::new(
4262 Path::new("").into(),
4263 &smol::block_on(fs.metadata(root_dir.path()))
4264 .unwrap()
4265 .unwrap(),
4266 &next_entry_id,
4267 Default::default(),
4268 ),
4269 fs.as_ref(),
4270 );
4271 let mut scanner = BackgroundScanner::new(
4272 Arc::new(Mutex::new(initial_snapshot.clone())),
4273 notify_tx,
4274 fs.clone(),
4275 Arc::new(gpui::executor::Background::new()),
4276 );
4277 smol::block_on(scanner.scan_dirs()).unwrap();
4278 scanner.snapshot().check_invariants();
4279
4280 let mut events = Vec::new();
4281 let mut snapshots = Vec::new();
4282 let mut mutations_len = operations;
4283 while mutations_len > 1 {
4284 if !events.is_empty() && rng.gen_bool(0.4) {
4285 let len = rng.gen_range(0..=events.len());
4286 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
4287 log::info!("Delivering events: {:#?}", to_deliver);
4288 smol::block_on(scanner.process_events(to_deliver));
4289 scanner.snapshot().check_invariants();
4290 } else {
4291 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
4292 mutations_len -= 1;
4293 }
4294
4295 if rng.gen_bool(0.2) {
4296 snapshots.push(scanner.snapshot());
4297 }
4298 }
4299 log::info!("Quiescing: {:#?}", events);
4300 smol::block_on(scanner.process_events(events));
4301 scanner.snapshot().check_invariants();
4302
4303 let (notify_tx, _notify_rx) = smol::channel::unbounded();
4304 let mut new_scanner = BackgroundScanner::new(
4305 Arc::new(Mutex::new(initial_snapshot)),
4306 notify_tx,
4307 scanner.fs.clone(),
4308 scanner.executor.clone(),
4309 );
4310 smol::block_on(new_scanner.scan_dirs()).unwrap();
4311 assert_eq!(
4312 scanner.snapshot().to_vec(true),
4313 new_scanner.snapshot().to_vec(true)
4314 );
4315
4316 for mut prev_snapshot in snapshots {
4317 let include_ignored = rng.gen::<bool>();
4318 if !include_ignored {
4319 let mut entries_by_path_edits = Vec::new();
4320 let mut entries_by_id_edits = Vec::new();
4321 for entry in prev_snapshot
4322 .entries_by_id
4323 .cursor::<()>()
4324 .filter(|e| e.is_ignored)
4325 {
4326 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
4327 entries_by_id_edits.push(Edit::Remove(entry.id));
4328 }
4329
4330 prev_snapshot
4331 .entries_by_path
4332 .edit(entries_by_path_edits, &());
4333 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
4334 }
4335
4336 let update = scanner
4337 .snapshot()
4338 .build_update(&prev_snapshot, 0, include_ignored);
4339 prev_snapshot.apply_update(update).unwrap();
4340 assert_eq!(
4341 prev_snapshot.to_vec(true),
4342 scanner.snapshot().to_vec(include_ignored)
4343 );
4344 }
4345 }
4346
4347 fn randomly_mutate_tree(
4348 root_path: &Path,
4349 insertion_probability: f64,
4350 rng: &mut impl Rng,
4351 ) -> Result<Vec<fsevent::Event>> {
4352 let root_path = root_path.canonicalize().unwrap();
4353 let (dirs, files) = read_dir_recursive(root_path.clone());
4354
4355 let mut events = Vec::new();
4356 let mut record_event = |path: PathBuf| {
4357 events.push(fsevent::Event {
4358 event_id: SystemTime::now()
4359 .duration_since(UNIX_EPOCH)
4360 .unwrap()
4361 .as_secs(),
4362 flags: fsevent::StreamFlags::empty(),
4363 path,
4364 });
4365 };
4366
4367 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
4368 let path = dirs.choose(rng).unwrap();
4369 let new_path = path.join(gen_name(rng));
4370
4371 if rng.gen() {
4372 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
4373 std::fs::create_dir(&new_path)?;
4374 } else {
4375 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
4376 std::fs::write(&new_path, "")?;
4377 }
4378 record_event(new_path);
4379 } else if rng.gen_bool(0.05) {
4380 let ignore_dir_path = dirs.choose(rng).unwrap();
4381 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
4382
4383 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
4384 let files_to_ignore = {
4385 let len = rng.gen_range(0..=subfiles.len());
4386 subfiles.choose_multiple(rng, len)
4387 };
4388 let dirs_to_ignore = {
4389 let len = rng.gen_range(0..subdirs.len());
4390 subdirs.choose_multiple(rng, len)
4391 };
4392
4393 let mut ignore_contents = String::new();
4394 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
4395 write!(
4396 ignore_contents,
4397 "{}\n",
4398 path_to_ignore
4399 .strip_prefix(&ignore_dir_path)?
4400 .to_str()
4401 .unwrap()
4402 )
4403 .unwrap();
4404 }
4405 log::info!(
4406 "Creating {:?} with contents:\n{}",
4407 ignore_path.strip_prefix(&root_path)?,
4408 ignore_contents
4409 );
4410 std::fs::write(&ignore_path, ignore_contents).unwrap();
4411 record_event(ignore_path);
4412 } else {
4413 let old_path = {
4414 let file_path = files.choose(rng);
4415 let dir_path = dirs[1..].choose(rng);
4416 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
4417 };
4418
4419 let is_rename = rng.gen();
4420 if is_rename {
4421 let new_path_parent = dirs
4422 .iter()
4423 .filter(|d| !d.starts_with(old_path))
4424 .choose(rng)
4425 .unwrap();
4426
4427 let overwrite_existing_dir =
4428 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
4429 let new_path = if overwrite_existing_dir {
4430 std::fs::remove_dir_all(&new_path_parent).ok();
4431 new_path_parent.to_path_buf()
4432 } else {
4433 new_path_parent.join(gen_name(rng))
4434 };
4435
4436 log::info!(
4437 "Renaming {:?} to {}{:?}",
4438 old_path.strip_prefix(&root_path)?,
4439 if overwrite_existing_dir {
4440 "overwrite "
4441 } else {
4442 ""
4443 },
4444 new_path.strip_prefix(&root_path)?
4445 );
4446 std::fs::rename(&old_path, &new_path)?;
4447 record_event(old_path.clone());
4448 record_event(new_path);
4449 } else if old_path.is_dir() {
4450 let (dirs, files) = read_dir_recursive(old_path.clone());
4451
4452 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
4453 std::fs::remove_dir_all(&old_path).unwrap();
4454 for file in files {
4455 record_event(file);
4456 }
4457 for dir in dirs {
4458 record_event(dir);
4459 }
4460 } else {
4461 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
4462 std::fs::remove_file(old_path).unwrap();
4463 record_event(old_path.clone());
4464 }
4465 }
4466
4467 Ok(events)
4468 }
4469
4470 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
4471 let child_entries = std::fs::read_dir(&path).unwrap();
4472 let mut dirs = vec![path];
4473 let mut files = Vec::new();
4474 for child_entry in child_entries {
4475 let child_path = child_entry.unwrap().path();
4476 if child_path.is_dir() {
4477 let (child_dirs, child_files) = read_dir_recursive(child_path);
4478 dirs.extend(child_dirs);
4479 files.extend(child_files);
4480 } else {
4481 files.push(child_path);
4482 }
4483 }
4484 (dirs, files)
4485 }
4486
4487 fn gen_name(rng: &mut impl Rng) -> String {
4488 (0..6)
4489 .map(|_| rng.sample(rand::distributions::Alphanumeric))
4490 .map(char::from)
4491 .collect()
4492 }
4493
4494 impl Snapshot {
4495 fn check_invariants(&self) {
4496 let mut files = self.files(true, 0);
4497 let mut visible_files = self.files(false, 0);
4498 for entry in self.entries_by_path.cursor::<()>() {
4499 if entry.is_file() {
4500 assert_eq!(files.next().unwrap().inode, entry.inode);
4501 if !entry.is_ignored {
4502 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
4503 }
4504 }
4505 }
4506 assert!(files.next().is_none());
4507 assert!(visible_files.next().is_none());
4508
4509 let mut bfs_paths = Vec::new();
4510 let mut stack = vec![Path::new("")];
4511 while let Some(path) = stack.pop() {
4512 bfs_paths.push(path);
4513 let ix = stack.len();
4514 for child_entry in self.child_entries(path) {
4515 stack.insert(ix, &child_entry.path);
4516 }
4517 }
4518
4519 let dfs_paths = self
4520 .entries_by_path
4521 .cursor::<()>()
4522 .map(|e| e.path.as_ref())
4523 .collect::<Vec<_>>();
4524 assert_eq!(bfs_paths, dfs_paths);
4525
4526 for (ignore_parent_path, _) in &self.ignores {
4527 assert!(self.entry_for_path(ignore_parent_path).is_some());
4528 assert!(self
4529 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
4530 .is_some());
4531 }
4532 }
4533
4534 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
4535 let mut paths = Vec::new();
4536 for entry in self.entries_by_path.cursor::<()>() {
4537 if include_ignored || !entry.is_ignored {
4538 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
4539 }
4540 }
4541 paths.sort_by(|a, b| a.0.cmp(&b.0));
4542 paths
4543 }
4544 }
4545}