1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4};
5use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
6use anyhow::{anyhow, Context, Result};
7use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
8use clock::ReplicaId;
9use futures::{Stream, StreamExt};
10use fuzzy::CharBag;
11use gpui::{
12 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
13 Task, UpgradeModelHandle, WeakModelHandle,
14};
15use language::{Buffer, Language, LanguageRegistry, Operation, Rope};
16use lazy_static::lazy_static;
17use lsp::LanguageServer;
18use parking_lot::Mutex;
19use postage::{
20 prelude::{Sink as _, Stream as _},
21 watch,
22};
23use serde::Deserialize;
24use smol::channel::{self, Sender};
25use std::{
26 any::Any,
27 cmp::{self, Ordering},
28 collections::HashMap,
29 convert::{TryFrom, TryInto},
30 ffi::{OsStr, OsString},
31 fmt,
32 future::Future,
33 ops::Deref,
34 path::{Path, PathBuf},
35 sync::{
36 atomic::{AtomicUsize, Ordering::SeqCst},
37 Arc,
38 },
39 time::{Duration, SystemTime},
40};
41use sum_tree::Bias;
42use sum_tree::{Edit, SeekTarget, SumTree};
43use util::{ResultExt, TryFutureExt};
44
45lazy_static! {
46 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
47}
48
49#[derive(Clone, Debug)]
50enum ScanState {
51 Idle,
52 Scanning,
53 Err(Arc<anyhow::Error>),
54}
55
56pub enum Worktree {
57 Local(LocalWorktree),
58 Remote(RemoteWorktree),
59}
60
61pub enum Event {
62 Closed,
63}
64
65#[derive(Clone, Debug)]
66pub struct Collaborator {
67 pub user: Arc<User>,
68 pub peer_id: PeerId,
69 pub replica_id: ReplicaId,
70}
71
72impl Collaborator {
73 fn from_proto(
74 message: proto::Collaborator,
75 user_store: &ModelHandle<UserStore>,
76 cx: &mut AsyncAppContext,
77 ) -> impl Future<Output = Result<Self>> {
78 let user = user_store.update(cx, |user_store, cx| {
79 user_store.fetch_user(message.user_id, cx)
80 });
81
82 async move {
83 Ok(Self {
84 peer_id: PeerId(message.peer_id),
85 user: user.await?,
86 replica_id: message.replica_id as ReplicaId,
87 })
88 }
89 }
90}
91
92impl Entity for Worktree {
93 type Event = Event;
94
95 fn release(&mut self, cx: &mut MutableAppContext) {
96 match self {
97 Self::Local(tree) => {
98 if let Some(worktree_id) = *tree.remote_id.borrow() {
99 let rpc = tree.client.clone();
100 cx.spawn(|_| async move {
101 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
102 log::error!("error closing worktree: {}", err);
103 }
104 })
105 .detach();
106 }
107 }
108 Self::Remote(tree) => {
109 let rpc = tree.client.clone();
110 let worktree_id = tree.remote_id;
111 cx.spawn(|_| async move {
112 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
113 log::error!("error closing worktree: {}", err);
114 }
115 })
116 .detach();
117 }
118 }
119 }
120
121 fn app_will_quit(
122 &mut self,
123 _: &mut MutableAppContext,
124 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
125 use futures::FutureExt;
126
127 if let Self::Local(worktree) = self {
128 let shutdown_futures = worktree
129 .language_servers
130 .drain()
131 .filter_map(|(_, server)| server.shutdown())
132 .collect::<Vec<_>>();
133 Some(
134 async move {
135 futures::future::join_all(shutdown_futures).await;
136 }
137 .boxed(),
138 )
139 } else {
140 None
141 }
142 }
143}
144
145impl Worktree {
146 pub async fn open_local(
147 client: Arc<Client>,
148 user_store: ModelHandle<UserStore>,
149 path: impl Into<Arc<Path>>,
150 fs: Arc<dyn Fs>,
151 languages: Arc<LanguageRegistry>,
152 cx: &mut AsyncAppContext,
153 ) -> Result<ModelHandle<Self>> {
154 let (tree, scan_states_tx) =
155 LocalWorktree::new(client, user_store, path, fs.clone(), languages, cx).await?;
156 tree.update(cx, |tree, cx| {
157 let tree = tree.as_local_mut().unwrap();
158 let abs_path = tree.snapshot.abs_path.clone();
159 let background_snapshot = tree.background_snapshot.clone();
160 let background = cx.background().clone();
161 tree._background_scanner_task = Some(cx.background().spawn(async move {
162 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
163 let scanner =
164 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
165 scanner.run(events).await;
166 }));
167 });
168 Ok(tree)
169 }
170
171 pub async fn open_remote(
172 client: Arc<Client>,
173 id: u64,
174 languages: Arc<LanguageRegistry>,
175 user_store: ModelHandle<UserStore>,
176 cx: &mut AsyncAppContext,
177 ) -> Result<ModelHandle<Self>> {
178 let response = client
179 .request(proto::JoinWorktree { worktree_id: id })
180 .await?;
181 Worktree::remote(response, client, user_store, languages, cx).await
182 }
183
184 async fn remote(
185 join_response: proto::JoinWorktreeResponse,
186 client: Arc<Client>,
187 user_store: ModelHandle<UserStore>,
188 languages: Arc<LanguageRegistry>,
189 cx: &mut AsyncAppContext,
190 ) -> Result<ModelHandle<Self>> {
191 let worktree = join_response
192 .worktree
193 .ok_or_else(|| anyhow!("empty worktree"))?;
194
195 let remote_id = worktree.id;
196 let replica_id = join_response.replica_id as ReplicaId;
197 let root_char_bag: CharBag = worktree
198 .root_name
199 .chars()
200 .map(|c| c.to_ascii_lowercase())
201 .collect();
202 let root_name = worktree.root_name.clone();
203 let (entries_by_path, entries_by_id) = cx
204 .background()
205 .spawn(async move {
206 let mut entries_by_path_edits = Vec::new();
207 let mut entries_by_id_edits = Vec::new();
208 for entry in worktree.entries {
209 match Entry::try_from((&root_char_bag, entry)) {
210 Ok(entry) => {
211 entries_by_id_edits.push(Edit::Insert(PathEntry {
212 id: entry.id,
213 path: entry.path.clone(),
214 is_ignored: entry.is_ignored,
215 scan_id: 0,
216 }));
217 entries_by_path_edits.push(Edit::Insert(entry));
218 }
219 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
220 }
221 }
222
223 let mut entries_by_path = SumTree::new();
224 let mut entries_by_id = SumTree::new();
225 entries_by_path.edit(entries_by_path_edits, &());
226 entries_by_id.edit(entries_by_id_edits, &());
227 (entries_by_path, entries_by_id)
228 })
229 .await;
230
231 let user_ids = join_response
232 .collaborators
233 .iter()
234 .map(|peer| peer.user_id)
235 .collect();
236 user_store
237 .update(cx, |user_store, cx| user_store.load_users(user_ids, cx))
238 .await?;
239 let mut collaborators = HashMap::with_capacity(join_response.collaborators.len());
240 for message in join_response.collaborators {
241 let collaborator = Collaborator::from_proto(message, &user_store, cx).await?;
242 collaborators.insert(collaborator.peer_id, collaborator);
243 }
244
245 let worktree = cx.update(|cx| {
246 cx.add_model(|cx: &mut ModelContext<Worktree>| {
247 let snapshot = Snapshot {
248 id: cx.model_id(),
249 scan_id: 0,
250 abs_path: Path::new("").into(),
251 root_name,
252 root_char_bag,
253 ignores: Default::default(),
254 entries_by_path,
255 entries_by_id,
256 removed_entry_ids: Default::default(),
257 next_entry_id: Default::default(),
258 };
259
260 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
261 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
262
263 cx.background()
264 .spawn(async move {
265 while let Some(update) = updates_rx.recv().await {
266 let mut snapshot = snapshot_tx.borrow().clone();
267 if let Err(error) = snapshot.apply_update(update) {
268 log::error!("error applying worktree update: {}", error);
269 }
270 *snapshot_tx.borrow_mut() = snapshot;
271 }
272 })
273 .detach();
274
275 {
276 let mut snapshot_rx = snapshot_rx.clone();
277 cx.spawn_weak(|this, mut cx| async move {
278 while let Some(_) = snapshot_rx.recv().await {
279 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
280 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
281 } else {
282 break;
283 }
284 }
285 })
286 .detach();
287 }
288
289 let _subscriptions = vec![
290 client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator),
291 client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator),
292 client.subscribe_to_entity(remote_id, cx, Self::handle_update),
293 client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
294 client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
295 client.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
296 ];
297
298 Worktree::Remote(RemoteWorktree {
299 remote_id,
300 replica_id,
301 snapshot,
302 snapshot_rx,
303 updates_tx,
304 client: client.clone(),
305 open_buffers: Default::default(),
306 collaborators,
307 queued_operations: Default::default(),
308 languages,
309 user_store,
310 _subscriptions,
311 })
312 })
313 });
314
315 Ok(worktree)
316 }
317
318 pub fn as_local(&self) -> Option<&LocalWorktree> {
319 if let Worktree::Local(worktree) = self {
320 Some(worktree)
321 } else {
322 None
323 }
324 }
325
326 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
327 if let Worktree::Local(worktree) = self {
328 Some(worktree)
329 } else {
330 None
331 }
332 }
333
334 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
335 if let Worktree::Remote(worktree) = self {
336 Some(worktree)
337 } else {
338 None
339 }
340 }
341
342 pub fn snapshot(&self) -> Snapshot {
343 match self {
344 Worktree::Local(worktree) => worktree.snapshot(),
345 Worktree::Remote(worktree) => worktree.snapshot(),
346 }
347 }
348
349 pub fn replica_id(&self) -> ReplicaId {
350 match self {
351 Worktree::Local(_) => 0,
352 Worktree::Remote(worktree) => worktree.replica_id,
353 }
354 }
355
356 pub fn languages(&self) -> &Arc<LanguageRegistry> {
357 match self {
358 Worktree::Local(worktree) => &worktree.languages,
359 Worktree::Remote(worktree) => &worktree.languages,
360 }
361 }
362
363 pub fn user_store(&self) -> &ModelHandle<UserStore> {
364 match self {
365 Worktree::Local(worktree) => &worktree.user_store,
366 Worktree::Remote(worktree) => &worktree.user_store,
367 }
368 }
369
370 pub fn handle_add_collaborator(
371 &mut self,
372 mut envelope: TypedEnvelope<proto::AddCollaborator>,
373 _: Arc<Client>,
374 cx: &mut ModelContext<Self>,
375 ) -> Result<()> {
376 let user_store = self.user_store().clone();
377 let collaborator = envelope
378 .payload
379 .collaborator
380 .take()
381 .ok_or_else(|| anyhow!("empty collaborator"))?;
382
383 cx.spawn(|this, mut cx| {
384 async move {
385 let collaborator =
386 Collaborator::from_proto(collaborator, &user_store, &mut cx).await?;
387 this.update(&mut cx, |this, cx| match this {
388 Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx),
389 Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx),
390 });
391 Ok(())
392 }
393 .log_err()
394 })
395 .detach();
396
397 Ok(())
398 }
399
400 pub fn handle_remove_collaborator(
401 &mut self,
402 envelope: TypedEnvelope<proto::RemoveCollaborator>,
403 _: Arc<Client>,
404 cx: &mut ModelContext<Self>,
405 ) -> Result<()> {
406 match self {
407 Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx),
408 Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx),
409 }
410 }
411
412 pub fn handle_update(
413 &mut self,
414 envelope: TypedEnvelope<proto::UpdateWorktree>,
415 _: Arc<Client>,
416 cx: &mut ModelContext<Self>,
417 ) -> anyhow::Result<()> {
418 self.as_remote_mut()
419 .unwrap()
420 .update_from_remote(envelope, cx)
421 }
422
423 pub fn handle_open_buffer(
424 &mut self,
425 envelope: TypedEnvelope<proto::OpenBuffer>,
426 rpc: Arc<Client>,
427 cx: &mut ModelContext<Self>,
428 ) -> anyhow::Result<()> {
429 let receipt = envelope.receipt();
430
431 let response = self
432 .as_local_mut()
433 .unwrap()
434 .open_remote_buffer(envelope, cx);
435
436 cx.background()
437 .spawn(
438 async move {
439 rpc.respond(receipt, response.await?).await?;
440 Ok(())
441 }
442 .log_err(),
443 )
444 .detach();
445
446 Ok(())
447 }
448
449 pub fn handle_close_buffer(
450 &mut self,
451 envelope: TypedEnvelope<proto::CloseBuffer>,
452 _: Arc<Client>,
453 cx: &mut ModelContext<Self>,
454 ) -> anyhow::Result<()> {
455 self.as_local_mut()
456 .unwrap()
457 .close_remote_buffer(envelope, cx)
458 }
459
460 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
461 match self {
462 Worktree::Local(worktree) => &worktree.collaborators,
463 Worktree::Remote(worktree) => &worktree.collaborators,
464 }
465 }
466
467 pub fn open_buffer(
468 &mut self,
469 path: impl AsRef<Path>,
470 cx: &mut ModelContext<Self>,
471 ) -> Task<Result<ModelHandle<Buffer>>> {
472 match self {
473 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
474 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
475 }
476 }
477
478 #[cfg(feature = "test-support")]
479 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
480 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
481 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
482 Worktree::Remote(worktree) => {
483 Box::new(worktree.open_buffers.values().filter_map(|buf| {
484 if let RemoteBuffer::Loaded(buf) = buf {
485 Some(buf)
486 } else {
487 None
488 }
489 }))
490 }
491 };
492
493 let path = path.as_ref();
494 open_buffers
495 .find(|buffer| {
496 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
497 file.path().as_ref() == path
498 } else {
499 false
500 }
501 })
502 .is_some()
503 }
504
505 pub fn handle_update_buffer(
506 &mut self,
507 envelope: TypedEnvelope<proto::UpdateBuffer>,
508 _: Arc<Client>,
509 cx: &mut ModelContext<Self>,
510 ) -> Result<()> {
511 let payload = envelope.payload.clone();
512 let buffer_id = payload.buffer_id as usize;
513 let ops = payload
514 .operations
515 .into_iter()
516 .map(|op| language::proto::deserialize_operation(op))
517 .collect::<Result<Vec<_>, _>>()?;
518
519 match self {
520 Worktree::Local(worktree) => {
521 let buffer = worktree
522 .open_buffers
523 .get(&buffer_id)
524 .and_then(|buf| buf.upgrade(cx))
525 .ok_or_else(|| {
526 anyhow!("invalid buffer {} in update buffer message", buffer_id)
527 })?;
528 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
529 }
530 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
531 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
532 Some(RemoteBuffer::Loaded(buffer)) => {
533 if let Some(buffer) = buffer.upgrade(cx) {
534 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
535 } else {
536 worktree
537 .open_buffers
538 .insert(buffer_id, RemoteBuffer::Operations(ops));
539 }
540 }
541 None => {
542 worktree
543 .open_buffers
544 .insert(buffer_id, RemoteBuffer::Operations(ops));
545 }
546 },
547 }
548
549 Ok(())
550 }
551
552 pub fn handle_save_buffer(
553 &mut self,
554 envelope: TypedEnvelope<proto::SaveBuffer>,
555 rpc: Arc<Client>,
556 cx: &mut ModelContext<Self>,
557 ) -> Result<()> {
558 let sender_id = envelope.original_sender_id()?;
559 let buffer = self
560 .as_local()
561 .unwrap()
562 .shared_buffers
563 .get(&sender_id)
564 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
565 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
566
567 let receipt = envelope.receipt();
568 let worktree_id = envelope.payload.worktree_id;
569 let buffer_id = envelope.payload.buffer_id;
570 let save = cx.spawn(|_, mut cx| async move {
571 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
572 });
573
574 cx.background()
575 .spawn(
576 async move {
577 let (version, mtime) = save.await?;
578
579 rpc.respond(
580 receipt,
581 proto::BufferSaved {
582 worktree_id,
583 buffer_id,
584 version: (&version).into(),
585 mtime: Some(mtime.into()),
586 },
587 )
588 .await?;
589
590 Ok(())
591 }
592 .log_err(),
593 )
594 .detach();
595
596 Ok(())
597 }
598
599 pub fn handle_buffer_saved(
600 &mut self,
601 envelope: TypedEnvelope<proto::BufferSaved>,
602 _: Arc<Client>,
603 cx: &mut ModelContext<Self>,
604 ) -> Result<()> {
605 let payload = envelope.payload.clone();
606 let worktree = self.as_remote_mut().unwrap();
607 if let Some(buffer) = worktree
608 .open_buffers
609 .get(&(payload.buffer_id as usize))
610 .and_then(|buf| buf.upgrade(cx))
611 {
612 buffer.update(cx, |buffer, cx| {
613 let version = payload.version.try_into()?;
614 let mtime = payload
615 .mtime
616 .ok_or_else(|| anyhow!("missing mtime"))?
617 .into();
618 buffer.did_save(version, mtime, None, cx);
619 Result::<_, anyhow::Error>::Ok(())
620 })?;
621 }
622 Ok(())
623 }
624
625 pub fn handle_unshare(
626 &mut self,
627 _: TypedEnvelope<proto::UnshareWorktree>,
628 _: Arc<Client>,
629 cx: &mut ModelContext<Self>,
630 ) -> Result<()> {
631 cx.emit(Event::Closed);
632 Ok(())
633 }
634
635 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
636 match self {
637 Self::Local(worktree) => {
638 let is_fake_fs = worktree.fs.is_fake();
639 worktree.snapshot = worktree.background_snapshot.lock().clone();
640 if worktree.is_scanning() {
641 if worktree.poll_task.is_none() {
642 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
643 if is_fake_fs {
644 smol::future::yield_now().await;
645 } else {
646 smol::Timer::after(Duration::from_millis(100)).await;
647 }
648 this.update(&mut cx, |this, cx| {
649 this.as_local_mut().unwrap().poll_task = None;
650 this.poll_snapshot(cx);
651 })
652 }));
653 }
654 } else {
655 worktree.poll_task.take();
656 self.update_open_buffers(cx);
657 }
658 }
659 Self::Remote(worktree) => {
660 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
661 self.update_open_buffers(cx);
662 }
663 };
664
665 cx.notify();
666 }
667
668 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
669 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
670 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
671 Self::Remote(worktree) => {
672 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
673 if let RemoteBuffer::Loaded(buf) = buf {
674 Some((id, buf))
675 } else {
676 None
677 }
678 }))
679 }
680 };
681
682 let local = self.as_local().is_some();
683 let worktree_path = self.abs_path.clone();
684 let worktree_handle = cx.handle();
685 let mut buffers_to_delete = Vec::new();
686 for (buffer_id, buffer) in open_buffers {
687 if let Some(buffer) = buffer.upgrade(cx) {
688 buffer.update(cx, |buffer, cx| {
689 if let Some(old_file) = buffer.file() {
690 let new_file = if let Some(entry) = old_file
691 .entry_id()
692 .and_then(|entry_id| self.entry_for_id(entry_id))
693 {
694 File {
695 is_local: local,
696 worktree_path: worktree_path.clone(),
697 entry_id: Some(entry.id),
698 mtime: entry.mtime,
699 path: entry.path.clone(),
700 worktree: worktree_handle.clone(),
701 }
702 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
703 File {
704 is_local: local,
705 worktree_path: worktree_path.clone(),
706 entry_id: Some(entry.id),
707 mtime: entry.mtime,
708 path: entry.path.clone(),
709 worktree: worktree_handle.clone(),
710 }
711 } else {
712 File {
713 is_local: local,
714 worktree_path: worktree_path.clone(),
715 entry_id: None,
716 path: old_file.path().clone(),
717 mtime: old_file.mtime(),
718 worktree: worktree_handle.clone(),
719 }
720 };
721
722 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
723 task.detach();
724 }
725 }
726 });
727 } else {
728 buffers_to_delete.push(*buffer_id);
729 }
730 }
731
732 for buffer_id in buffers_to_delete {
733 match self {
734 Self::Local(worktree) => {
735 worktree.open_buffers.remove(&buffer_id);
736 }
737 Self::Remote(worktree) => {
738 worktree.open_buffers.remove(&buffer_id);
739 }
740 }
741 }
742 }
743
744 fn update_diagnostics(
745 &mut self,
746 params: lsp::PublishDiagnosticsParams,
747 cx: &mut ModelContext<Worktree>,
748 ) -> Result<()> {
749 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
750 let file_path = params
751 .uri
752 .to_file_path()
753 .map_err(|_| anyhow!("URI is not a file"))?
754 .strip_prefix(&this.abs_path)
755 .context("path is not within worktree")?
756 .to_owned();
757
758 for buffer in this.open_buffers.values() {
759 if let Some(buffer) = buffer.upgrade(cx) {
760 if buffer
761 .read(cx)
762 .file()
763 .map_or(false, |file| file.path().as_ref() == file_path)
764 {
765 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
766 (
767 buffer.remote_id(),
768 buffer.update_diagnostics(params.version, params.diagnostics, cx),
769 )
770 });
771 self.send_buffer_update(remote_id, operation?, cx);
772 return Ok(());
773 }
774 }
775 }
776
777 this.diagnostics.insert(file_path, params.diagnostics);
778 Ok(())
779 }
780
781 fn send_buffer_update(
782 &mut self,
783 buffer_id: u64,
784 operation: Operation,
785 cx: &mut ModelContext<Self>,
786 ) {
787 if let Some((rpc, remote_id)) = match self {
788 Worktree::Local(worktree) => worktree
789 .remote_id
790 .borrow()
791 .map(|id| (worktree.client.clone(), id)),
792 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
793 } {
794 cx.spawn(|worktree, mut cx| async move {
795 if let Err(error) = rpc
796 .request(proto::UpdateBuffer {
797 worktree_id: remote_id,
798 buffer_id,
799 operations: vec![language::proto::serialize_operation(&operation)],
800 })
801 .await
802 {
803 worktree.update(&mut cx, |worktree, _| {
804 log::error!("error sending buffer operation: {}", error);
805 match worktree {
806 Worktree::Local(t) => &mut t.queued_operations,
807 Worktree::Remote(t) => &mut t.queued_operations,
808 }
809 .push((buffer_id, operation));
810 });
811 }
812 })
813 .detach();
814 }
815 }
816}
817
818impl Deref for Worktree {
819 type Target = Snapshot;
820
821 fn deref(&self) -> &Self::Target {
822 match self {
823 Worktree::Local(worktree) => &worktree.snapshot,
824 Worktree::Remote(worktree) => &worktree.snapshot,
825 }
826 }
827}
828
829pub struct LocalWorktree {
830 snapshot: Snapshot,
831 config: WorktreeConfig,
832 background_snapshot: Arc<Mutex<Snapshot>>,
833 last_scan_state_rx: watch::Receiver<ScanState>,
834 _background_scanner_task: Option<Task<()>>,
835 _maintain_remote_id_task: Task<Option<()>>,
836 poll_task: Option<Task<()>>,
837 remote_id: watch::Receiver<Option<u64>>,
838 share: Option<ShareState>,
839 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
840 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
841 diagnostics: HashMap<PathBuf, Vec<lsp::Diagnostic>>,
842 collaborators: HashMap<PeerId, Collaborator>,
843 queued_operations: Vec<(u64, Operation)>,
844 languages: Arc<LanguageRegistry>,
845 client: Arc<Client>,
846 user_store: ModelHandle<UserStore>,
847 fs: Arc<dyn Fs>,
848 language_servers: HashMap<String, Arc<LanguageServer>>,
849}
850
851#[derive(Default, Deserialize)]
852struct WorktreeConfig {
853 collaborators: Vec<String>,
854}
855
856impl LocalWorktree {
857 async fn new(
858 client: Arc<Client>,
859 user_store: ModelHandle<UserStore>,
860 path: impl Into<Arc<Path>>,
861 fs: Arc<dyn Fs>,
862 languages: Arc<LanguageRegistry>,
863 cx: &mut AsyncAppContext,
864 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
865 let abs_path = path.into();
866 let path: Arc<Path> = Arc::from(Path::new(""));
867 let next_entry_id = AtomicUsize::new(0);
868
869 // After determining whether the root entry is a file or a directory, populate the
870 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
871 let root_name = abs_path
872 .file_name()
873 .map_or(String::new(), |f| f.to_string_lossy().to_string());
874 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
875 let metadata = fs.metadata(&abs_path).await?;
876
877 let mut config = WorktreeConfig::default();
878 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
879 if let Ok(parsed) = toml::from_str(&zed_toml) {
880 config = parsed;
881 }
882 }
883
884 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
885 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
886 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
887 let mut snapshot = Snapshot {
888 id: cx.model_id(),
889 scan_id: 0,
890 abs_path,
891 root_name: root_name.clone(),
892 root_char_bag,
893 ignores: Default::default(),
894 entries_by_path: Default::default(),
895 entries_by_id: Default::default(),
896 removed_entry_ids: Default::default(),
897 next_entry_id: Arc::new(next_entry_id),
898 };
899 if let Some(metadata) = metadata {
900 snapshot.insert_entry(
901 Entry::new(
902 path.into(),
903 &metadata,
904 &snapshot.next_entry_id,
905 snapshot.root_char_bag,
906 ),
907 fs.as_ref(),
908 );
909 }
910
911 let (mut remote_id_tx, remote_id_rx) = watch::channel();
912 let _maintain_remote_id_task = cx.spawn_weak({
913 let rpc = client.clone();
914 move |this, cx| {
915 async move {
916 let mut status = rpc.status();
917 while let Some(status) = status.recv().await {
918 if let Some(this) = this.upgrade(&cx) {
919 let remote_id = if let client::Status::Connected { .. } = status {
920 let authorized_logins = this.read_with(&cx, |this, _| {
921 this.as_local().unwrap().config.collaborators.clone()
922 });
923 let response = rpc
924 .request(proto::OpenWorktree {
925 root_name: root_name.clone(),
926 authorized_logins,
927 })
928 .await?;
929
930 Some(response.worktree_id)
931 } else {
932 None
933 };
934 if remote_id_tx.send(remote_id).await.is_err() {
935 break;
936 }
937 }
938 }
939 Ok(())
940 }
941 .log_err()
942 }
943 });
944
945 let tree = Self {
946 snapshot: snapshot.clone(),
947 config,
948 remote_id: remote_id_rx,
949 background_snapshot: Arc::new(Mutex::new(snapshot)),
950 last_scan_state_rx,
951 _background_scanner_task: None,
952 _maintain_remote_id_task,
953 share: None,
954 poll_task: None,
955 open_buffers: Default::default(),
956 shared_buffers: Default::default(),
957 diagnostics: Default::default(),
958 queued_operations: Default::default(),
959 collaborators: Default::default(),
960 languages,
961 client,
962 user_store,
963 fs,
964 language_servers: Default::default(),
965 };
966
967 cx.spawn_weak(|this, mut cx| async move {
968 while let Ok(scan_state) = scan_states_rx.recv().await {
969 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
970 let to_send = handle.update(&mut cx, |this, cx| {
971 last_scan_state_tx.blocking_send(scan_state).ok();
972 this.poll_snapshot(cx);
973 let tree = this.as_local_mut().unwrap();
974 if !tree.is_scanning() {
975 if let Some(share) = tree.share.as_ref() {
976 return Some((tree.snapshot(), share.snapshots_tx.clone()));
977 }
978 }
979 None
980 });
981
982 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
983 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
984 log::error!("error submitting snapshot to send {}", err);
985 }
986 }
987 } else {
988 break;
989 }
990 }
991 })
992 .detach();
993
994 Worktree::Local(tree)
995 });
996
997 Ok((tree, scan_states_tx))
998 }
999
1000 pub fn languages(&self) -> &LanguageRegistry {
1001 &self.languages
1002 }
1003
1004 pub fn ensure_language_server(
1005 &mut self,
1006 language: &Language,
1007 cx: &mut ModelContext<Worktree>,
1008 ) -> Option<Arc<LanguageServer>> {
1009 if let Some(server) = self.language_servers.get(language.name()) {
1010 return Some(server.clone());
1011 }
1012
1013 if let Some(language_server) = language
1014 .start_server(self.abs_path(), cx)
1015 .log_err()
1016 .flatten()
1017 {
1018 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
1019 language_server
1020 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
1021 smol::block_on(diagnostics_tx.send(params)).ok();
1022 })
1023 .detach();
1024 cx.spawn_weak(|this, mut cx| async move {
1025 while let Ok(diagnostics) = diagnostics_rx.recv().await {
1026 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1027 handle.update(&mut cx, |this, cx| {
1028 this.update_diagnostics(diagnostics, cx).log_err();
1029 });
1030 } else {
1031 break;
1032 }
1033 }
1034 })
1035 .detach();
1036
1037 self.language_servers
1038 .insert(language.name().to_string(), language_server.clone());
1039 Some(language_server.clone())
1040 } else {
1041 None
1042 }
1043 }
1044
1045 pub fn open_buffer(
1046 &mut self,
1047 path: &Path,
1048 cx: &mut ModelContext<Worktree>,
1049 ) -> Task<Result<ModelHandle<Buffer>>> {
1050 let handle = cx.handle();
1051
1052 // If there is already a buffer for the given path, then return it.
1053 let mut existing_buffer = None;
1054 self.open_buffers.retain(|_buffer_id, buffer| {
1055 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1056 if let Some(file) = buffer.read(cx.as_ref()).file() {
1057 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1058 existing_buffer = Some(buffer);
1059 }
1060 }
1061 true
1062 } else {
1063 false
1064 }
1065 });
1066
1067 let path = Arc::from(path);
1068 cx.spawn(|this, mut cx| async move {
1069 if let Some(existing_buffer) = existing_buffer {
1070 Ok(existing_buffer)
1071 } else {
1072 let (file, contents) = this
1073 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
1074 .await?;
1075 let language = this.read_with(&cx, |this, _| {
1076 use language::File;
1077 this.languages().select_language(file.full_path()).cloned()
1078 });
1079 let (diagnostics, language_server) = this.update(&mut cx, |this, cx| {
1080 let this = this.as_local_mut().unwrap();
1081 (
1082 this.diagnostics.remove(path.as_ref()),
1083 language
1084 .as_ref()
1085 .and_then(|language| this.ensure_language_server(language, cx)),
1086 )
1087 });
1088 let buffer = cx.add_model(|cx| {
1089 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
1090 buffer.set_language(language, language_server, cx);
1091 if let Some(diagnostics) = diagnostics {
1092 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1093 }
1094 buffer
1095 });
1096 this.update(&mut cx, |this, _| {
1097 let this = this
1098 .as_local_mut()
1099 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1100
1101 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1102 Ok(buffer)
1103 })
1104 }
1105 })
1106 }
1107
1108 pub fn open_remote_buffer(
1109 &mut self,
1110 envelope: TypedEnvelope<proto::OpenBuffer>,
1111 cx: &mut ModelContext<Worktree>,
1112 ) -> Task<Result<proto::OpenBufferResponse>> {
1113 let peer_id = envelope.original_sender_id();
1114 let path = Path::new(&envelope.payload.path);
1115
1116 let buffer = self.open_buffer(path, cx);
1117
1118 cx.spawn(|this, mut cx| async move {
1119 let buffer = buffer.await?;
1120 this.update(&mut cx, |this, cx| {
1121 this.as_local_mut()
1122 .unwrap()
1123 .shared_buffers
1124 .entry(peer_id?)
1125 .or_default()
1126 .insert(buffer.id() as u64, buffer.clone());
1127
1128 Ok(proto::OpenBufferResponse {
1129 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1130 })
1131 })
1132 })
1133 }
1134
1135 pub fn close_remote_buffer(
1136 &mut self,
1137 envelope: TypedEnvelope<proto::CloseBuffer>,
1138 cx: &mut ModelContext<Worktree>,
1139 ) -> Result<()> {
1140 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1141 shared_buffers.remove(&envelope.payload.buffer_id);
1142 cx.notify();
1143 }
1144
1145 Ok(())
1146 }
1147
1148 pub fn add_collaborator(
1149 &mut self,
1150 collaborator: Collaborator,
1151 cx: &mut ModelContext<Worktree>,
1152 ) {
1153 self.collaborators
1154 .insert(collaborator.peer_id, collaborator);
1155 cx.notify();
1156 }
1157
1158 pub fn remove_collaborator(
1159 &mut self,
1160 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1161 cx: &mut ModelContext<Worktree>,
1162 ) -> Result<()> {
1163 let peer_id = PeerId(envelope.payload.peer_id);
1164 let replica_id = self
1165 .collaborators
1166 .remove(&peer_id)
1167 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1168 .replica_id;
1169 self.shared_buffers.remove(&peer_id);
1170 for (_, buffer) in &self.open_buffers {
1171 if let Some(buffer) = buffer.upgrade(cx) {
1172 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1173 }
1174 }
1175 cx.notify();
1176
1177 Ok(())
1178 }
1179
1180 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1181 let mut scan_state_rx = self.last_scan_state_rx.clone();
1182 async move {
1183 let mut scan_state = Some(scan_state_rx.borrow().clone());
1184 while let Some(ScanState::Scanning) = scan_state {
1185 scan_state = scan_state_rx.recv().await;
1186 }
1187 }
1188 }
1189
1190 pub fn remote_id(&self) -> Option<u64> {
1191 *self.remote_id.borrow()
1192 }
1193
1194 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1195 let mut remote_id = self.remote_id.clone();
1196 async move {
1197 while let Some(remote_id) = remote_id.recv().await {
1198 if remote_id.is_some() {
1199 return remote_id;
1200 }
1201 }
1202 None
1203 }
1204 }
1205
1206 fn is_scanning(&self) -> bool {
1207 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1208 true
1209 } else {
1210 false
1211 }
1212 }
1213
1214 pub fn snapshot(&self) -> Snapshot {
1215 self.snapshot.clone()
1216 }
1217
1218 pub fn abs_path(&self) -> &Path {
1219 self.snapshot.abs_path.as_ref()
1220 }
1221
1222 pub fn contains_abs_path(&self, path: &Path) -> bool {
1223 path.starts_with(&self.snapshot.abs_path)
1224 }
1225
1226 fn absolutize(&self, path: &Path) -> PathBuf {
1227 if path.file_name().is_some() {
1228 self.snapshot.abs_path.join(path)
1229 } else {
1230 self.snapshot.abs_path.to_path_buf()
1231 }
1232 }
1233
1234 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1235 let handle = cx.handle();
1236 let path = Arc::from(path);
1237 let worktree_path = self.abs_path.clone();
1238 let abs_path = self.absolutize(&path);
1239 let background_snapshot = self.background_snapshot.clone();
1240 let fs = self.fs.clone();
1241 cx.spawn(|this, mut cx| async move {
1242 let text = fs.load(&abs_path).await?;
1243 // Eagerly populate the snapshot with an updated entry for the loaded file
1244 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1245 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1246 Ok((
1247 File {
1248 entry_id: Some(entry.id),
1249 worktree: handle,
1250 worktree_path,
1251 path: entry.path,
1252 mtime: entry.mtime,
1253 is_local: true,
1254 },
1255 text,
1256 ))
1257 })
1258 }
1259
1260 pub fn save_buffer_as(
1261 &self,
1262 buffer: ModelHandle<Buffer>,
1263 path: impl Into<Arc<Path>>,
1264 text: Rope,
1265 cx: &mut ModelContext<Worktree>,
1266 ) -> Task<Result<File>> {
1267 let save = self.save(path, text, cx);
1268 cx.spawn(|this, mut cx| async move {
1269 let entry = save.await?;
1270 this.update(&mut cx, |this, cx| {
1271 let this = this.as_local_mut().unwrap();
1272 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1273 Ok(File {
1274 entry_id: Some(entry.id),
1275 worktree: cx.handle(),
1276 worktree_path: this.abs_path.clone(),
1277 path: entry.path,
1278 mtime: entry.mtime,
1279 is_local: true,
1280 })
1281 })
1282 })
1283 }
1284
1285 fn save(
1286 &self,
1287 path: impl Into<Arc<Path>>,
1288 text: Rope,
1289 cx: &mut ModelContext<Worktree>,
1290 ) -> Task<Result<Entry>> {
1291 let path = path.into();
1292 let abs_path = self.absolutize(&path);
1293 let background_snapshot = self.background_snapshot.clone();
1294 let fs = self.fs.clone();
1295 let save = cx.background().spawn(async move {
1296 fs.save(&abs_path, &text).await?;
1297 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1298 });
1299
1300 cx.spawn(|this, mut cx| async move {
1301 let entry = save.await?;
1302 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1303 Ok(entry)
1304 })
1305 }
1306
1307 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1308 let snapshot = self.snapshot();
1309 let share_request = self.share_request(cx);
1310 let rpc = self.client.clone();
1311 cx.spawn(|this, mut cx| async move {
1312 let share_request = if let Some(request) = share_request.await {
1313 request
1314 } else {
1315 return Err(anyhow!("failed to open worktree on the server"));
1316 };
1317
1318 let remote_id = share_request.worktree.as_ref().unwrap().id;
1319 let share_response = rpc.request(share_request).await?;
1320
1321 log::info!("sharing worktree {:?}", share_response);
1322 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1323 smol::channel::unbounded::<Snapshot>();
1324
1325 cx.background()
1326 .spawn({
1327 let rpc = rpc.clone();
1328 async move {
1329 let mut prev_snapshot = snapshot;
1330 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1331 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1332 match rpc.send(message).await {
1333 Ok(()) => prev_snapshot = snapshot,
1334 Err(err) => log::error!("error sending snapshot diff {}", err),
1335 }
1336 }
1337 }
1338 })
1339 .detach();
1340
1341 this.update(&mut cx, |worktree, cx| {
1342 let _subscriptions = vec![
1343 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator),
1344 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator),
1345 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1346 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1347 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1348 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1349 ];
1350
1351 let worktree = worktree.as_local_mut().unwrap();
1352 worktree.share = Some(ShareState {
1353 snapshots_tx: snapshots_to_send_tx,
1354 _subscriptions,
1355 });
1356 });
1357
1358 Ok(remote_id)
1359 })
1360 }
1361
1362 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1363 self.share.take();
1364 let rpc = self.client.clone();
1365 let remote_id = self.remote_id();
1366 cx.foreground()
1367 .spawn(
1368 async move {
1369 if let Some(worktree_id) = remote_id {
1370 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1371 }
1372 Ok(())
1373 }
1374 .log_err(),
1375 )
1376 .detach()
1377 }
1378
1379 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1380 let remote_id = self.next_remote_id();
1381 let snapshot = self.snapshot();
1382 let root_name = self.root_name.clone();
1383 cx.background().spawn(async move {
1384 remote_id.await.map(|id| {
1385 let entries = snapshot
1386 .entries_by_path
1387 .cursor::<()>()
1388 .filter(|e| !e.is_ignored)
1389 .map(Into::into)
1390 .collect();
1391 proto::ShareWorktree {
1392 worktree: Some(proto::Worktree {
1393 id,
1394 root_name,
1395 entries,
1396 }),
1397 }
1398 })
1399 })
1400 }
1401}
1402
1403fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1404 let contents = smol::block_on(fs.load(&abs_path))?;
1405 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1406 let mut builder = GitignoreBuilder::new(parent);
1407 for line in contents.lines() {
1408 builder.add_line(Some(abs_path.into()), line)?;
1409 }
1410 Ok(builder.build()?)
1411}
1412
1413impl Deref for LocalWorktree {
1414 type Target = Snapshot;
1415
1416 fn deref(&self) -> &Self::Target {
1417 &self.snapshot
1418 }
1419}
1420
1421impl fmt::Debug for LocalWorktree {
1422 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1423 self.snapshot.fmt(f)
1424 }
1425}
1426
1427struct ShareState {
1428 snapshots_tx: Sender<Snapshot>,
1429 _subscriptions: Vec<client::Subscription>,
1430}
1431
1432pub struct RemoteWorktree {
1433 remote_id: u64,
1434 snapshot: Snapshot,
1435 snapshot_rx: watch::Receiver<Snapshot>,
1436 client: Arc<Client>,
1437 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1438 replica_id: ReplicaId,
1439 open_buffers: HashMap<usize, RemoteBuffer>,
1440 collaborators: HashMap<PeerId, Collaborator>,
1441 languages: Arc<LanguageRegistry>,
1442 user_store: ModelHandle<UserStore>,
1443 queued_operations: Vec<(u64, Operation)>,
1444 _subscriptions: Vec<client::Subscription>,
1445}
1446
1447impl RemoteWorktree {
1448 pub fn open_buffer(
1449 &mut self,
1450 path: &Path,
1451 cx: &mut ModelContext<Worktree>,
1452 ) -> Task<Result<ModelHandle<Buffer>>> {
1453 let mut existing_buffer = None;
1454 self.open_buffers.retain(|_buffer_id, buffer| {
1455 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1456 if let Some(file) = buffer.read(cx.as_ref()).file() {
1457 if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
1458 existing_buffer = Some(buffer);
1459 }
1460 }
1461 true
1462 } else {
1463 false
1464 }
1465 });
1466
1467 let rpc = self.client.clone();
1468 let replica_id = self.replica_id;
1469 let remote_worktree_id = self.remote_id;
1470 let root_path = self.snapshot.abs_path.clone();
1471 let path = path.to_string_lossy().to_string();
1472 cx.spawn_weak(|this, mut cx| async move {
1473 if let Some(existing_buffer) = existing_buffer {
1474 Ok(existing_buffer)
1475 } else {
1476 let entry = this
1477 .upgrade(&cx)
1478 .ok_or_else(|| anyhow!("worktree was closed"))?
1479 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1480 .ok_or_else(|| anyhow!("file does not exist"))?;
1481 let response = rpc
1482 .request(proto::OpenBuffer {
1483 worktree_id: remote_worktree_id as u64,
1484 path,
1485 })
1486 .await?;
1487
1488 let this = this
1489 .upgrade(&cx)
1490 .ok_or_else(|| anyhow!("worktree was closed"))?;
1491 let file = File {
1492 entry_id: Some(entry.id),
1493 worktree: this.clone(),
1494 worktree_path: root_path,
1495 path: entry.path,
1496 mtime: entry.mtime,
1497 is_local: false,
1498 };
1499 let language = this.read_with(&cx, |this, _| {
1500 use language::File;
1501 this.languages().select_language(file.full_path()).cloned()
1502 });
1503 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1504 let buffer_id = remote_buffer.id as usize;
1505 let buffer = cx.add_model(|cx| {
1506 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1507 .unwrap()
1508 .with_language(language, None, cx)
1509 });
1510 this.update(&mut cx, |this, cx| {
1511 let this = this.as_remote_mut().unwrap();
1512 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1513 .open_buffers
1514 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1515 {
1516 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1517 }
1518 Result::<_, anyhow::Error>::Ok(())
1519 })?;
1520 Ok(buffer)
1521 }
1522 })
1523 }
1524
1525 pub fn remote_id(&self) -> u64 {
1526 self.remote_id
1527 }
1528
1529 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1530 for (_, buffer) in self.open_buffers.drain() {
1531 if let RemoteBuffer::Loaded(buffer) = buffer {
1532 if let Some(buffer) = buffer.upgrade(cx) {
1533 buffer.update(cx, |buffer, cx| buffer.close(cx))
1534 }
1535 }
1536 }
1537 }
1538
1539 fn snapshot(&self) -> Snapshot {
1540 self.snapshot.clone()
1541 }
1542
1543 fn update_from_remote(
1544 &mut self,
1545 envelope: TypedEnvelope<proto::UpdateWorktree>,
1546 cx: &mut ModelContext<Worktree>,
1547 ) -> Result<()> {
1548 let mut tx = self.updates_tx.clone();
1549 let payload = envelope.payload.clone();
1550 cx.background()
1551 .spawn(async move {
1552 tx.send(payload).await.expect("receiver runs to completion");
1553 })
1554 .detach();
1555
1556 Ok(())
1557 }
1558
1559 pub fn add_collaborator(
1560 &mut self,
1561 collaborator: Collaborator,
1562 cx: &mut ModelContext<Worktree>,
1563 ) {
1564 self.collaborators
1565 .insert(collaborator.peer_id, collaborator);
1566 cx.notify();
1567 }
1568
1569 pub fn remove_collaborator(
1570 &mut self,
1571 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1572 cx: &mut ModelContext<Worktree>,
1573 ) -> Result<()> {
1574 let peer_id = PeerId(envelope.payload.peer_id);
1575 let replica_id = self
1576 .collaborators
1577 .remove(&peer_id)
1578 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1579 .replica_id;
1580 for (_, buffer) in &self.open_buffers {
1581 if let Some(buffer) = buffer.upgrade(cx) {
1582 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1583 }
1584 }
1585 cx.notify();
1586 Ok(())
1587 }
1588}
1589
1590enum RemoteBuffer {
1591 Operations(Vec<Operation>),
1592 Loaded(WeakModelHandle<Buffer>),
1593}
1594
1595impl RemoteBuffer {
1596 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1597 match self {
1598 Self::Operations(_) => None,
1599 Self::Loaded(buffer) => buffer.upgrade(cx),
1600 }
1601 }
1602}
1603
1604#[derive(Clone)]
1605pub struct Snapshot {
1606 id: usize,
1607 scan_id: usize,
1608 abs_path: Arc<Path>,
1609 root_name: String,
1610 root_char_bag: CharBag,
1611 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1612 entries_by_path: SumTree<Entry>,
1613 entries_by_id: SumTree<PathEntry>,
1614 removed_entry_ids: HashMap<u64, usize>,
1615 next_entry_id: Arc<AtomicUsize>,
1616}
1617
1618impl Snapshot {
1619 pub fn id(&self) -> usize {
1620 self.id
1621 }
1622
1623 pub fn build_update(
1624 &self,
1625 other: &Self,
1626 worktree_id: u64,
1627 include_ignored: bool,
1628 ) -> proto::UpdateWorktree {
1629 let mut updated_entries = Vec::new();
1630 let mut removed_entries = Vec::new();
1631 let mut self_entries = self
1632 .entries_by_id
1633 .cursor::<()>()
1634 .filter(|e| include_ignored || !e.is_ignored)
1635 .peekable();
1636 let mut other_entries = other
1637 .entries_by_id
1638 .cursor::<()>()
1639 .filter(|e| include_ignored || !e.is_ignored)
1640 .peekable();
1641 loop {
1642 match (self_entries.peek(), other_entries.peek()) {
1643 (Some(self_entry), Some(other_entry)) => {
1644 match Ord::cmp(&self_entry.id, &other_entry.id) {
1645 Ordering::Less => {
1646 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1647 updated_entries.push(entry);
1648 self_entries.next();
1649 }
1650 Ordering::Equal => {
1651 if self_entry.scan_id != other_entry.scan_id {
1652 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1653 updated_entries.push(entry);
1654 }
1655
1656 self_entries.next();
1657 other_entries.next();
1658 }
1659 Ordering::Greater => {
1660 removed_entries.push(other_entry.id as u64);
1661 other_entries.next();
1662 }
1663 }
1664 }
1665 (Some(self_entry), None) => {
1666 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1667 updated_entries.push(entry);
1668 self_entries.next();
1669 }
1670 (None, Some(other_entry)) => {
1671 removed_entries.push(other_entry.id as u64);
1672 other_entries.next();
1673 }
1674 (None, None) => break,
1675 }
1676 }
1677
1678 proto::UpdateWorktree {
1679 updated_entries,
1680 removed_entries,
1681 worktree_id,
1682 }
1683 }
1684
1685 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1686 self.scan_id += 1;
1687 let scan_id = self.scan_id;
1688
1689 let mut entries_by_path_edits = Vec::new();
1690 let mut entries_by_id_edits = Vec::new();
1691 for entry_id in update.removed_entries {
1692 let entry_id = entry_id as usize;
1693 let entry = self
1694 .entry_for_id(entry_id)
1695 .ok_or_else(|| anyhow!("unknown entry"))?;
1696 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1697 entries_by_id_edits.push(Edit::Remove(entry.id));
1698 }
1699
1700 for entry in update.updated_entries {
1701 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1702 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1703 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1704 }
1705 entries_by_id_edits.push(Edit::Insert(PathEntry {
1706 id: entry.id,
1707 path: entry.path.clone(),
1708 is_ignored: entry.is_ignored,
1709 scan_id,
1710 }));
1711 entries_by_path_edits.push(Edit::Insert(entry));
1712 }
1713
1714 self.entries_by_path.edit(entries_by_path_edits, &());
1715 self.entries_by_id.edit(entries_by_id_edits, &());
1716
1717 Ok(())
1718 }
1719
1720 pub fn file_count(&self) -> usize {
1721 self.entries_by_path.summary().file_count
1722 }
1723
1724 pub fn visible_file_count(&self) -> usize {
1725 self.entries_by_path.summary().visible_file_count
1726 }
1727
1728 fn traverse_from_offset(
1729 &self,
1730 include_dirs: bool,
1731 include_ignored: bool,
1732 start_offset: usize,
1733 ) -> Traversal {
1734 let mut cursor = self.entries_by_path.cursor();
1735 cursor.seek(
1736 &TraversalTarget::Count {
1737 count: start_offset,
1738 include_dirs,
1739 include_ignored,
1740 },
1741 Bias::Right,
1742 &(),
1743 );
1744 Traversal {
1745 cursor,
1746 include_dirs,
1747 include_ignored,
1748 }
1749 }
1750
1751 fn traverse_from_path(
1752 &self,
1753 include_dirs: bool,
1754 include_ignored: bool,
1755 path: &Path,
1756 ) -> Traversal {
1757 let mut cursor = self.entries_by_path.cursor();
1758 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1759 Traversal {
1760 cursor,
1761 include_dirs,
1762 include_ignored,
1763 }
1764 }
1765
1766 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1767 self.traverse_from_offset(false, include_ignored, start)
1768 }
1769
1770 pub fn entries(&self, include_ignored: bool) -> Traversal {
1771 self.traverse_from_offset(true, include_ignored, 0)
1772 }
1773
1774 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1775 let empty_path = Path::new("");
1776 self.entries_by_path
1777 .cursor::<()>()
1778 .filter(move |entry| entry.path.as_ref() != empty_path)
1779 .map(|entry| &entry.path)
1780 }
1781
1782 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1783 let mut cursor = self.entries_by_path.cursor();
1784 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1785 let traversal = Traversal {
1786 cursor,
1787 include_dirs: true,
1788 include_ignored: true,
1789 };
1790 ChildEntriesIter {
1791 traversal,
1792 parent_path,
1793 }
1794 }
1795
1796 pub fn root_entry(&self) -> Option<&Entry> {
1797 self.entry_for_path("")
1798 }
1799
1800 pub fn root_name(&self) -> &str {
1801 &self.root_name
1802 }
1803
1804 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1805 let path = path.as_ref();
1806 self.traverse_from_path(true, true, path)
1807 .entry()
1808 .and_then(|entry| {
1809 if entry.path.as_ref() == path {
1810 Some(entry)
1811 } else {
1812 None
1813 }
1814 })
1815 }
1816
1817 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1818 let entry = self.entries_by_id.get(&id, &())?;
1819 self.entry_for_path(&entry.path)
1820 }
1821
1822 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1823 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1824 }
1825
1826 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1827 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1828 let abs_path = self.abs_path.join(&entry.path);
1829 match build_gitignore(&abs_path, fs) {
1830 Ok(ignore) => {
1831 let ignore_dir_path = entry.path.parent().unwrap();
1832 self.ignores
1833 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1834 }
1835 Err(error) => {
1836 log::error!(
1837 "error loading .gitignore file {:?} - {:?}",
1838 &entry.path,
1839 error
1840 );
1841 }
1842 }
1843 }
1844
1845 self.reuse_entry_id(&mut entry);
1846 self.entries_by_path.insert_or_replace(entry.clone(), &());
1847 self.entries_by_id.insert_or_replace(
1848 PathEntry {
1849 id: entry.id,
1850 path: entry.path.clone(),
1851 is_ignored: entry.is_ignored,
1852 scan_id: self.scan_id,
1853 },
1854 &(),
1855 );
1856 entry
1857 }
1858
1859 fn populate_dir(
1860 &mut self,
1861 parent_path: Arc<Path>,
1862 entries: impl IntoIterator<Item = Entry>,
1863 ignore: Option<Arc<Gitignore>>,
1864 ) {
1865 let mut parent_entry = self
1866 .entries_by_path
1867 .get(&PathKey(parent_path.clone()), &())
1868 .unwrap()
1869 .clone();
1870 if let Some(ignore) = ignore {
1871 self.ignores.insert(parent_path, (ignore, self.scan_id));
1872 }
1873 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1874 parent_entry.kind = EntryKind::Dir;
1875 } else {
1876 unreachable!();
1877 }
1878
1879 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1880 let mut entries_by_id_edits = Vec::new();
1881
1882 for mut entry in entries {
1883 self.reuse_entry_id(&mut entry);
1884 entries_by_id_edits.push(Edit::Insert(PathEntry {
1885 id: entry.id,
1886 path: entry.path.clone(),
1887 is_ignored: entry.is_ignored,
1888 scan_id: self.scan_id,
1889 }));
1890 entries_by_path_edits.push(Edit::Insert(entry));
1891 }
1892
1893 self.entries_by_path.edit(entries_by_path_edits, &());
1894 self.entries_by_id.edit(entries_by_id_edits, &());
1895 }
1896
1897 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1898 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1899 entry.id = removed_entry_id;
1900 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1901 entry.id = existing_entry.id;
1902 }
1903 }
1904
1905 fn remove_path(&mut self, path: &Path) {
1906 let mut new_entries;
1907 let removed_entries;
1908 {
1909 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
1910 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
1911 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
1912 new_entries.push_tree(cursor.suffix(&()), &());
1913 }
1914 self.entries_by_path = new_entries;
1915
1916 let mut entries_by_id_edits = Vec::new();
1917 for entry in removed_entries.cursor::<()>() {
1918 let removed_entry_id = self
1919 .removed_entry_ids
1920 .entry(entry.inode)
1921 .or_insert(entry.id);
1922 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1923 entries_by_id_edits.push(Edit::Remove(entry.id));
1924 }
1925 self.entries_by_id.edit(entries_by_id_edits, &());
1926
1927 if path.file_name() == Some(&GITIGNORE) {
1928 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1929 *scan_id = self.scan_id;
1930 }
1931 }
1932 }
1933
1934 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1935 let mut new_ignores = Vec::new();
1936 for ancestor in path.ancestors().skip(1) {
1937 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1938 new_ignores.push((ancestor, Some(ignore.clone())));
1939 } else {
1940 new_ignores.push((ancestor, None));
1941 }
1942 }
1943
1944 let mut ignore_stack = IgnoreStack::none();
1945 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1946 if ignore_stack.is_path_ignored(&parent_path, true) {
1947 ignore_stack = IgnoreStack::all();
1948 break;
1949 } else if let Some(ignore) = ignore {
1950 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1951 }
1952 }
1953
1954 if ignore_stack.is_path_ignored(path, is_dir) {
1955 ignore_stack = IgnoreStack::all();
1956 }
1957
1958 ignore_stack
1959 }
1960}
1961
1962impl fmt::Debug for Snapshot {
1963 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1964 for entry in self.entries_by_path.cursor::<()>() {
1965 for _ in entry.path.ancestors().skip(1) {
1966 write!(f, " ")?;
1967 }
1968 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1969 }
1970 Ok(())
1971 }
1972}
1973
1974#[derive(Clone, PartialEq)]
1975pub struct File {
1976 entry_id: Option<usize>,
1977 worktree: ModelHandle<Worktree>,
1978 worktree_path: Arc<Path>,
1979 pub path: Arc<Path>,
1980 pub mtime: SystemTime,
1981 is_local: bool,
1982}
1983
1984impl language::File for File {
1985 fn worktree_id(&self) -> usize {
1986 self.worktree.id()
1987 }
1988
1989 fn entry_id(&self) -> Option<usize> {
1990 self.entry_id
1991 }
1992
1993 fn mtime(&self) -> SystemTime {
1994 self.mtime
1995 }
1996
1997 fn path(&self) -> &Arc<Path> {
1998 &self.path
1999 }
2000
2001 fn abs_path(&self) -> Option<PathBuf> {
2002 if self.is_local {
2003 Some(self.worktree_path.join(&self.path))
2004 } else {
2005 None
2006 }
2007 }
2008
2009 fn full_path(&self) -> PathBuf {
2010 let mut full_path = PathBuf::new();
2011 if let Some(worktree_name) = self.worktree_path.file_name() {
2012 full_path.push(worktree_name);
2013 }
2014 full_path.push(&self.path);
2015 full_path
2016 }
2017
2018 /// Returns the last component of this handle's absolute path. If this handle refers to the root
2019 /// of its worktree, then this method will return the name of the worktree itself.
2020 fn file_name<'a>(&'a self) -> Option<OsString> {
2021 self.path
2022 .file_name()
2023 .or_else(|| self.worktree_path.file_name())
2024 .map(Into::into)
2025 }
2026
2027 fn is_deleted(&self) -> bool {
2028 self.entry_id.is_none()
2029 }
2030
2031 fn save(
2032 &self,
2033 buffer_id: u64,
2034 text: Rope,
2035 version: clock::Global,
2036 cx: &mut MutableAppContext,
2037 ) -> Task<Result<(clock::Global, SystemTime)>> {
2038 self.worktree.update(cx, |worktree, cx| match worktree {
2039 Worktree::Local(worktree) => {
2040 let rpc = worktree.client.clone();
2041 let worktree_id = *worktree.remote_id.borrow();
2042 let save = worktree.save(self.path.clone(), text, cx);
2043 cx.background().spawn(async move {
2044 let entry = save.await?;
2045 if let Some(worktree_id) = worktree_id {
2046 rpc.send(proto::BufferSaved {
2047 worktree_id,
2048 buffer_id,
2049 version: (&version).into(),
2050 mtime: Some(entry.mtime.into()),
2051 })
2052 .await?;
2053 }
2054 Ok((version, entry.mtime))
2055 })
2056 }
2057 Worktree::Remote(worktree) => {
2058 let rpc = worktree.client.clone();
2059 let worktree_id = worktree.remote_id;
2060 cx.foreground().spawn(async move {
2061 let response = rpc
2062 .request(proto::SaveBuffer {
2063 worktree_id,
2064 buffer_id,
2065 })
2066 .await?;
2067 let version = response.version.try_into()?;
2068 let mtime = response
2069 .mtime
2070 .ok_or_else(|| anyhow!("missing mtime"))?
2071 .into();
2072 Ok((version, mtime))
2073 })
2074 }
2075 })
2076 }
2077
2078 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
2079 let worktree = self.worktree.read(cx).as_local()?;
2080 let abs_path = worktree.absolutize(&self.path);
2081 let fs = worktree.fs.clone();
2082 Some(
2083 cx.background()
2084 .spawn(async move { fs.load(&abs_path).await }),
2085 )
2086 }
2087
2088 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2089 self.worktree.update(cx, |worktree, cx| {
2090 worktree.send_buffer_update(buffer_id, operation, cx);
2091 });
2092 }
2093
2094 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2095 self.worktree.update(cx, |worktree, cx| {
2096 if let Worktree::Remote(worktree) = worktree {
2097 let worktree_id = worktree.remote_id;
2098 let rpc = worktree.client.clone();
2099 cx.background()
2100 .spawn(async move {
2101 if let Err(error) = rpc
2102 .send(proto::CloseBuffer {
2103 worktree_id,
2104 buffer_id,
2105 })
2106 .await
2107 {
2108 log::error!("error closing remote buffer: {}", error);
2109 }
2110 })
2111 .detach();
2112 }
2113 });
2114 }
2115
2116 fn boxed_clone(&self) -> Box<dyn language::File> {
2117 Box::new(self.clone())
2118 }
2119
2120 fn as_any(&self) -> &dyn Any {
2121 self
2122 }
2123}
2124
2125#[derive(Clone, Debug)]
2126pub struct Entry {
2127 pub id: usize,
2128 pub kind: EntryKind,
2129 pub path: Arc<Path>,
2130 pub inode: u64,
2131 pub mtime: SystemTime,
2132 pub is_symlink: bool,
2133 pub is_ignored: bool,
2134}
2135
2136#[derive(Clone, Debug)]
2137pub enum EntryKind {
2138 PendingDir,
2139 Dir,
2140 File(CharBag),
2141}
2142
2143impl Entry {
2144 fn new(
2145 path: Arc<Path>,
2146 metadata: &fs::Metadata,
2147 next_entry_id: &AtomicUsize,
2148 root_char_bag: CharBag,
2149 ) -> Self {
2150 Self {
2151 id: next_entry_id.fetch_add(1, SeqCst),
2152 kind: if metadata.is_dir {
2153 EntryKind::PendingDir
2154 } else {
2155 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2156 },
2157 path,
2158 inode: metadata.inode,
2159 mtime: metadata.mtime,
2160 is_symlink: metadata.is_symlink,
2161 is_ignored: false,
2162 }
2163 }
2164
2165 pub fn is_dir(&self) -> bool {
2166 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2167 }
2168
2169 pub fn is_file(&self) -> bool {
2170 matches!(self.kind, EntryKind::File(_))
2171 }
2172}
2173
2174impl sum_tree::Item for Entry {
2175 type Summary = EntrySummary;
2176
2177 fn summary(&self) -> Self::Summary {
2178 let visible_count = if self.is_ignored { 0 } else { 1 };
2179 let file_count;
2180 let visible_file_count;
2181 if self.is_file() {
2182 file_count = 1;
2183 visible_file_count = visible_count;
2184 } else {
2185 file_count = 0;
2186 visible_file_count = 0;
2187 }
2188
2189 EntrySummary {
2190 max_path: self.path.clone(),
2191 count: 1,
2192 visible_count,
2193 file_count,
2194 visible_file_count,
2195 }
2196 }
2197}
2198
2199impl sum_tree::KeyedItem for Entry {
2200 type Key = PathKey;
2201
2202 fn key(&self) -> Self::Key {
2203 PathKey(self.path.clone())
2204 }
2205}
2206
2207#[derive(Clone, Debug)]
2208pub struct EntrySummary {
2209 max_path: Arc<Path>,
2210 count: usize,
2211 visible_count: usize,
2212 file_count: usize,
2213 visible_file_count: usize,
2214}
2215
2216impl Default for EntrySummary {
2217 fn default() -> Self {
2218 Self {
2219 max_path: Arc::from(Path::new("")),
2220 count: 0,
2221 visible_count: 0,
2222 file_count: 0,
2223 visible_file_count: 0,
2224 }
2225 }
2226}
2227
2228impl sum_tree::Summary for EntrySummary {
2229 type Context = ();
2230
2231 fn add_summary(&mut self, rhs: &Self, _: &()) {
2232 self.max_path = rhs.max_path.clone();
2233 self.visible_count += rhs.visible_count;
2234 self.file_count += rhs.file_count;
2235 self.visible_file_count += rhs.visible_file_count;
2236 }
2237}
2238
2239#[derive(Clone, Debug)]
2240struct PathEntry {
2241 id: usize,
2242 path: Arc<Path>,
2243 is_ignored: bool,
2244 scan_id: usize,
2245}
2246
2247impl sum_tree::Item for PathEntry {
2248 type Summary = PathEntrySummary;
2249
2250 fn summary(&self) -> Self::Summary {
2251 PathEntrySummary { max_id: self.id }
2252 }
2253}
2254
2255impl sum_tree::KeyedItem for PathEntry {
2256 type Key = usize;
2257
2258 fn key(&self) -> Self::Key {
2259 self.id
2260 }
2261}
2262
2263#[derive(Clone, Debug, Default)]
2264struct PathEntrySummary {
2265 max_id: usize,
2266}
2267
2268impl sum_tree::Summary for PathEntrySummary {
2269 type Context = ();
2270
2271 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2272 self.max_id = summary.max_id;
2273 }
2274}
2275
2276impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2277 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2278 *self = summary.max_id;
2279 }
2280}
2281
2282#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2283pub struct PathKey(Arc<Path>);
2284
2285impl Default for PathKey {
2286 fn default() -> Self {
2287 Self(Path::new("").into())
2288 }
2289}
2290
2291impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2292 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2293 self.0 = summary.max_path.clone();
2294 }
2295}
2296
2297struct BackgroundScanner {
2298 fs: Arc<dyn Fs>,
2299 snapshot: Arc<Mutex<Snapshot>>,
2300 notify: Sender<ScanState>,
2301 executor: Arc<executor::Background>,
2302}
2303
2304impl BackgroundScanner {
2305 fn new(
2306 snapshot: Arc<Mutex<Snapshot>>,
2307 notify: Sender<ScanState>,
2308 fs: Arc<dyn Fs>,
2309 executor: Arc<executor::Background>,
2310 ) -> Self {
2311 Self {
2312 fs,
2313 snapshot,
2314 notify,
2315 executor,
2316 }
2317 }
2318
2319 fn abs_path(&self) -> Arc<Path> {
2320 self.snapshot.lock().abs_path.clone()
2321 }
2322
2323 fn snapshot(&self) -> Snapshot {
2324 self.snapshot.lock().clone()
2325 }
2326
2327 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2328 if self.notify.send(ScanState::Scanning).await.is_err() {
2329 return;
2330 }
2331
2332 if let Err(err) = self.scan_dirs().await {
2333 if self
2334 .notify
2335 .send(ScanState::Err(Arc::new(err)))
2336 .await
2337 .is_err()
2338 {
2339 return;
2340 }
2341 }
2342
2343 if self.notify.send(ScanState::Idle).await.is_err() {
2344 return;
2345 }
2346
2347 futures::pin_mut!(events_rx);
2348 while let Some(events) = events_rx.next().await {
2349 if self.notify.send(ScanState::Scanning).await.is_err() {
2350 break;
2351 }
2352
2353 if !self.process_events(events).await {
2354 break;
2355 }
2356
2357 if self.notify.send(ScanState::Idle).await.is_err() {
2358 break;
2359 }
2360 }
2361 }
2362
2363 async fn scan_dirs(&mut self) -> Result<()> {
2364 let root_char_bag;
2365 let next_entry_id;
2366 let is_dir;
2367 {
2368 let snapshot = self.snapshot.lock();
2369 root_char_bag = snapshot.root_char_bag;
2370 next_entry_id = snapshot.next_entry_id.clone();
2371 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2372 };
2373
2374 if is_dir {
2375 let path: Arc<Path> = Arc::from(Path::new(""));
2376 let abs_path = self.abs_path();
2377 let (tx, rx) = channel::unbounded();
2378 tx.send(ScanJob {
2379 abs_path: abs_path.to_path_buf(),
2380 path,
2381 ignore_stack: IgnoreStack::none(),
2382 scan_queue: tx.clone(),
2383 })
2384 .await
2385 .unwrap();
2386 drop(tx);
2387
2388 self.executor
2389 .scoped(|scope| {
2390 for _ in 0..self.executor.num_cpus() {
2391 scope.spawn(async {
2392 while let Ok(job) = rx.recv().await {
2393 if let Err(err) = self
2394 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2395 .await
2396 {
2397 log::error!("error scanning {:?}: {}", job.abs_path, err);
2398 }
2399 }
2400 });
2401 }
2402 })
2403 .await;
2404 }
2405
2406 Ok(())
2407 }
2408
2409 async fn scan_dir(
2410 &self,
2411 root_char_bag: CharBag,
2412 next_entry_id: Arc<AtomicUsize>,
2413 job: &ScanJob,
2414 ) -> Result<()> {
2415 let mut new_entries: Vec<Entry> = Vec::new();
2416 let mut new_jobs: Vec<ScanJob> = Vec::new();
2417 let mut ignore_stack = job.ignore_stack.clone();
2418 let mut new_ignore = None;
2419
2420 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2421 while let Some(child_abs_path) = child_paths.next().await {
2422 let child_abs_path = match child_abs_path {
2423 Ok(child_abs_path) => child_abs_path,
2424 Err(error) => {
2425 log::error!("error processing entry {:?}", error);
2426 continue;
2427 }
2428 };
2429 let child_name = child_abs_path.file_name().unwrap();
2430 let child_path: Arc<Path> = job.path.join(child_name).into();
2431 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2432 Some(metadata) => metadata,
2433 None => continue,
2434 };
2435
2436 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2437 if child_name == *GITIGNORE {
2438 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2439 Ok(ignore) => {
2440 let ignore = Arc::new(ignore);
2441 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2442 new_ignore = Some(ignore);
2443 }
2444 Err(error) => {
2445 log::error!(
2446 "error loading .gitignore file {:?} - {:?}",
2447 child_name,
2448 error
2449 );
2450 }
2451 }
2452
2453 // Update ignore status of any child entries we've already processed to reflect the
2454 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2455 // there should rarely be too numerous. Update the ignore stack associated with any
2456 // new jobs as well.
2457 let mut new_jobs = new_jobs.iter_mut();
2458 for entry in &mut new_entries {
2459 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2460 if entry.is_dir() {
2461 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2462 IgnoreStack::all()
2463 } else {
2464 ignore_stack.clone()
2465 };
2466 }
2467 }
2468 }
2469
2470 let mut child_entry = Entry::new(
2471 child_path.clone(),
2472 &child_metadata,
2473 &next_entry_id,
2474 root_char_bag,
2475 );
2476
2477 if child_metadata.is_dir {
2478 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2479 child_entry.is_ignored = is_ignored;
2480 new_entries.push(child_entry);
2481 new_jobs.push(ScanJob {
2482 abs_path: child_abs_path,
2483 path: child_path,
2484 ignore_stack: if is_ignored {
2485 IgnoreStack::all()
2486 } else {
2487 ignore_stack.clone()
2488 },
2489 scan_queue: job.scan_queue.clone(),
2490 });
2491 } else {
2492 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2493 new_entries.push(child_entry);
2494 };
2495 }
2496
2497 self.snapshot
2498 .lock()
2499 .populate_dir(job.path.clone(), new_entries, new_ignore);
2500 for new_job in new_jobs {
2501 job.scan_queue.send(new_job).await.unwrap();
2502 }
2503
2504 Ok(())
2505 }
2506
2507 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2508 let mut snapshot = self.snapshot();
2509 snapshot.scan_id += 1;
2510
2511 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2512 abs_path
2513 } else {
2514 return false;
2515 };
2516 let root_char_bag = snapshot.root_char_bag;
2517 let next_entry_id = snapshot.next_entry_id.clone();
2518
2519 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2520 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2521
2522 for event in &events {
2523 match event.path.strip_prefix(&root_abs_path) {
2524 Ok(path) => snapshot.remove_path(&path),
2525 Err(_) => {
2526 log::error!(
2527 "unexpected event {:?} for root path {:?}",
2528 event.path,
2529 root_abs_path
2530 );
2531 continue;
2532 }
2533 }
2534 }
2535
2536 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2537 for event in events {
2538 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2539 Ok(path) => Arc::from(path.to_path_buf()),
2540 Err(_) => {
2541 log::error!(
2542 "unexpected event {:?} for root path {:?}",
2543 event.path,
2544 root_abs_path
2545 );
2546 continue;
2547 }
2548 };
2549
2550 match self.fs.metadata(&event.path).await {
2551 Ok(Some(metadata)) => {
2552 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2553 let mut fs_entry = Entry::new(
2554 path.clone(),
2555 &metadata,
2556 snapshot.next_entry_id.as_ref(),
2557 snapshot.root_char_bag,
2558 );
2559 fs_entry.is_ignored = ignore_stack.is_all();
2560 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2561 if metadata.is_dir {
2562 scan_queue_tx
2563 .send(ScanJob {
2564 abs_path: event.path,
2565 path,
2566 ignore_stack,
2567 scan_queue: scan_queue_tx.clone(),
2568 })
2569 .await
2570 .unwrap();
2571 }
2572 }
2573 Ok(None) => {}
2574 Err(err) => {
2575 // TODO - create a special 'error' entry in the entries tree to mark this
2576 log::error!("error reading file on event {:?}", err);
2577 }
2578 }
2579 }
2580
2581 *self.snapshot.lock() = snapshot;
2582
2583 // Scan any directories that were created as part of this event batch.
2584 drop(scan_queue_tx);
2585 self.executor
2586 .scoped(|scope| {
2587 for _ in 0..self.executor.num_cpus() {
2588 scope.spawn(async {
2589 while let Ok(job) = scan_queue_rx.recv().await {
2590 if let Err(err) = self
2591 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2592 .await
2593 {
2594 log::error!("error scanning {:?}: {}", job.abs_path, err);
2595 }
2596 }
2597 });
2598 }
2599 })
2600 .await;
2601
2602 // Attempt to detect renames only over a single batch of file-system events.
2603 self.snapshot.lock().removed_entry_ids.clear();
2604
2605 self.update_ignore_statuses().await;
2606 true
2607 }
2608
2609 async fn update_ignore_statuses(&self) {
2610 let mut snapshot = self.snapshot();
2611
2612 let mut ignores_to_update = Vec::new();
2613 let mut ignores_to_delete = Vec::new();
2614 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2615 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2616 ignores_to_update.push(parent_path.clone());
2617 }
2618
2619 let ignore_path = parent_path.join(&*GITIGNORE);
2620 if snapshot.entry_for_path(ignore_path).is_none() {
2621 ignores_to_delete.push(parent_path.clone());
2622 }
2623 }
2624
2625 for parent_path in ignores_to_delete {
2626 snapshot.ignores.remove(&parent_path);
2627 self.snapshot.lock().ignores.remove(&parent_path);
2628 }
2629
2630 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2631 ignores_to_update.sort_unstable();
2632 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2633 while let Some(parent_path) = ignores_to_update.next() {
2634 while ignores_to_update
2635 .peek()
2636 .map_or(false, |p| p.starts_with(&parent_path))
2637 {
2638 ignores_to_update.next().unwrap();
2639 }
2640
2641 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2642 ignore_queue_tx
2643 .send(UpdateIgnoreStatusJob {
2644 path: parent_path,
2645 ignore_stack,
2646 ignore_queue: ignore_queue_tx.clone(),
2647 })
2648 .await
2649 .unwrap();
2650 }
2651 drop(ignore_queue_tx);
2652
2653 self.executor
2654 .scoped(|scope| {
2655 for _ in 0..self.executor.num_cpus() {
2656 scope.spawn(async {
2657 while let Ok(job) = ignore_queue_rx.recv().await {
2658 self.update_ignore_status(job, &snapshot).await;
2659 }
2660 });
2661 }
2662 })
2663 .await;
2664 }
2665
2666 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2667 let mut ignore_stack = job.ignore_stack;
2668 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2669 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2670 }
2671
2672 let mut entries_by_id_edits = Vec::new();
2673 let mut entries_by_path_edits = Vec::new();
2674 for mut entry in snapshot.child_entries(&job.path).cloned() {
2675 let was_ignored = entry.is_ignored;
2676 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2677 if entry.is_dir() {
2678 let child_ignore_stack = if entry.is_ignored {
2679 IgnoreStack::all()
2680 } else {
2681 ignore_stack.clone()
2682 };
2683 job.ignore_queue
2684 .send(UpdateIgnoreStatusJob {
2685 path: entry.path.clone(),
2686 ignore_stack: child_ignore_stack,
2687 ignore_queue: job.ignore_queue.clone(),
2688 })
2689 .await
2690 .unwrap();
2691 }
2692
2693 if entry.is_ignored != was_ignored {
2694 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2695 path_entry.scan_id = snapshot.scan_id;
2696 path_entry.is_ignored = entry.is_ignored;
2697 entries_by_id_edits.push(Edit::Insert(path_entry));
2698 entries_by_path_edits.push(Edit::Insert(entry));
2699 }
2700 }
2701
2702 let mut snapshot = self.snapshot.lock();
2703 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2704 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2705 }
2706}
2707
2708async fn refresh_entry(
2709 fs: &dyn Fs,
2710 snapshot: &Mutex<Snapshot>,
2711 path: Arc<Path>,
2712 abs_path: &Path,
2713) -> Result<Entry> {
2714 let root_char_bag;
2715 let next_entry_id;
2716 {
2717 let snapshot = snapshot.lock();
2718 root_char_bag = snapshot.root_char_bag;
2719 next_entry_id = snapshot.next_entry_id.clone();
2720 }
2721 let entry = Entry::new(
2722 path,
2723 &fs.metadata(abs_path)
2724 .await?
2725 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2726 &next_entry_id,
2727 root_char_bag,
2728 );
2729 Ok(snapshot.lock().insert_entry(entry, fs))
2730}
2731
2732fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2733 let mut result = root_char_bag;
2734 result.extend(
2735 path.to_string_lossy()
2736 .chars()
2737 .map(|c| c.to_ascii_lowercase()),
2738 );
2739 result
2740}
2741
2742struct ScanJob {
2743 abs_path: PathBuf,
2744 path: Arc<Path>,
2745 ignore_stack: Arc<IgnoreStack>,
2746 scan_queue: Sender<ScanJob>,
2747}
2748
2749struct UpdateIgnoreStatusJob {
2750 path: Arc<Path>,
2751 ignore_stack: Arc<IgnoreStack>,
2752 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2753}
2754
2755pub trait WorktreeHandle {
2756 #[cfg(test)]
2757 fn flush_fs_events<'a>(
2758 &self,
2759 cx: &'a gpui::TestAppContext,
2760 ) -> futures::future::LocalBoxFuture<'a, ()>;
2761}
2762
2763impl WorktreeHandle for ModelHandle<Worktree> {
2764 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2765 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2766 // extra directory scans, and emit extra scan-state notifications.
2767 //
2768 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2769 // to ensure that all redundant FS events have already been processed.
2770 #[cfg(test)]
2771 fn flush_fs_events<'a>(
2772 &self,
2773 cx: &'a gpui::TestAppContext,
2774 ) -> futures::future::LocalBoxFuture<'a, ()> {
2775 use smol::future::FutureExt;
2776
2777 let filename = "fs-event-sentinel";
2778 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2779 let tree = self.clone();
2780 async move {
2781 std::fs::write(root_path.join(filename), "").unwrap();
2782 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2783 .await;
2784
2785 std::fs::remove_file(root_path.join(filename)).unwrap();
2786 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2787 .await;
2788
2789 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2790 .await;
2791 }
2792 .boxed_local()
2793 }
2794}
2795
2796#[derive(Clone, Debug)]
2797struct TraversalProgress<'a> {
2798 max_path: &'a Path,
2799 count: usize,
2800 visible_count: usize,
2801 file_count: usize,
2802 visible_file_count: usize,
2803}
2804
2805impl<'a> TraversalProgress<'a> {
2806 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2807 match (include_ignored, include_dirs) {
2808 (true, true) => self.count,
2809 (true, false) => self.file_count,
2810 (false, true) => self.visible_count,
2811 (false, false) => self.visible_file_count,
2812 }
2813 }
2814}
2815
2816impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2817 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2818 self.max_path = summary.max_path.as_ref();
2819 self.count += summary.count;
2820 self.visible_count += summary.visible_count;
2821 self.file_count += summary.file_count;
2822 self.visible_file_count += summary.visible_file_count;
2823 }
2824}
2825
2826impl<'a> Default for TraversalProgress<'a> {
2827 fn default() -> Self {
2828 Self {
2829 max_path: Path::new(""),
2830 count: 0,
2831 visible_count: 0,
2832 file_count: 0,
2833 visible_file_count: 0,
2834 }
2835 }
2836}
2837
2838pub struct Traversal<'a> {
2839 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2840 include_ignored: bool,
2841 include_dirs: bool,
2842}
2843
2844impl<'a> Traversal<'a> {
2845 pub fn advance(&mut self) -> bool {
2846 self.advance_to_offset(self.offset() + 1)
2847 }
2848
2849 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2850 self.cursor.seek_forward(
2851 &TraversalTarget::Count {
2852 count: offset,
2853 include_dirs: self.include_dirs,
2854 include_ignored: self.include_ignored,
2855 },
2856 Bias::Right,
2857 &(),
2858 )
2859 }
2860
2861 pub fn advance_to_sibling(&mut self) -> bool {
2862 while let Some(entry) = self.cursor.item() {
2863 self.cursor.seek_forward(
2864 &TraversalTarget::PathSuccessor(&entry.path),
2865 Bias::Left,
2866 &(),
2867 );
2868 if let Some(entry) = self.cursor.item() {
2869 if (self.include_dirs || !entry.is_dir())
2870 && (self.include_ignored || !entry.is_ignored)
2871 {
2872 return true;
2873 }
2874 }
2875 }
2876 false
2877 }
2878
2879 pub fn entry(&self) -> Option<&'a Entry> {
2880 self.cursor.item()
2881 }
2882
2883 pub fn offset(&self) -> usize {
2884 self.cursor
2885 .start()
2886 .count(self.include_dirs, self.include_ignored)
2887 }
2888}
2889
2890impl<'a> Iterator for Traversal<'a> {
2891 type Item = &'a Entry;
2892
2893 fn next(&mut self) -> Option<Self::Item> {
2894 if let Some(item) = self.entry() {
2895 self.advance();
2896 Some(item)
2897 } else {
2898 None
2899 }
2900 }
2901}
2902
2903#[derive(Debug)]
2904enum TraversalTarget<'a> {
2905 Path(&'a Path),
2906 PathSuccessor(&'a Path),
2907 Count {
2908 count: usize,
2909 include_ignored: bool,
2910 include_dirs: bool,
2911 },
2912}
2913
2914impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
2915 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
2916 match self {
2917 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
2918 TraversalTarget::PathSuccessor(path) => {
2919 if !cursor_location.max_path.starts_with(path) {
2920 Ordering::Equal
2921 } else {
2922 Ordering::Greater
2923 }
2924 }
2925 TraversalTarget::Count {
2926 count,
2927 include_dirs,
2928 include_ignored,
2929 } => Ord::cmp(
2930 count,
2931 &cursor_location.count(*include_dirs, *include_ignored),
2932 ),
2933 }
2934 }
2935}
2936
2937struct ChildEntriesIter<'a> {
2938 parent_path: &'a Path,
2939 traversal: Traversal<'a>,
2940}
2941
2942impl<'a> Iterator for ChildEntriesIter<'a> {
2943 type Item = &'a Entry;
2944
2945 fn next(&mut self) -> Option<Self::Item> {
2946 if let Some(item) = self.traversal.entry() {
2947 if item.path.starts_with(&self.parent_path) {
2948 self.traversal.advance_to_sibling();
2949 return Some(item);
2950 }
2951 }
2952 None
2953 }
2954}
2955
2956impl<'a> From<&'a Entry> for proto::Entry {
2957 fn from(entry: &'a Entry) -> Self {
2958 Self {
2959 id: entry.id as u64,
2960 is_dir: entry.is_dir(),
2961 path: entry.path.to_string_lossy().to_string(),
2962 inode: entry.inode,
2963 mtime: Some(entry.mtime.into()),
2964 is_symlink: entry.is_symlink,
2965 is_ignored: entry.is_ignored,
2966 }
2967 }
2968}
2969
2970impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2971 type Error = anyhow::Error;
2972
2973 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2974 if let Some(mtime) = entry.mtime {
2975 let kind = if entry.is_dir {
2976 EntryKind::Dir
2977 } else {
2978 let mut char_bag = root_char_bag.clone();
2979 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2980 EntryKind::File(char_bag)
2981 };
2982 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2983 Ok(Entry {
2984 id: entry.id as usize,
2985 kind,
2986 path: path.clone(),
2987 inode: entry.inode,
2988 mtime: mtime.into(),
2989 is_symlink: entry.is_symlink,
2990 is_ignored: entry.is_ignored,
2991 })
2992 } else {
2993 Err(anyhow!(
2994 "missing mtime in remote worktree entry {:?}",
2995 entry.path
2996 ))
2997 }
2998 }
2999}
3000
3001#[cfg(test)]
3002mod tests {
3003 use super::*;
3004 use crate::fs::FakeFs;
3005 use anyhow::Result;
3006 use client::test::{FakeHttpClient, FakeServer};
3007 use fs::RealFs;
3008 use language::{tree_sitter_rust, LanguageServerConfig};
3009 use language::{Diagnostic, LanguageConfig};
3010 use lsp::Url;
3011 use rand::prelude::*;
3012 use serde_json::json;
3013 use std::{cell::RefCell, rc::Rc};
3014 use std::{
3015 env,
3016 fmt::Write,
3017 time::{SystemTime, UNIX_EPOCH},
3018 };
3019 use text::{Patch, Point};
3020 use util::test::temp_tree;
3021
3022 #[gpui::test]
3023 async fn test_traversal(mut cx: gpui::TestAppContext) {
3024 let fs = FakeFs::new();
3025 fs.insert_tree(
3026 "/root",
3027 json!({
3028 ".gitignore": "a/b\n",
3029 "a": {
3030 "b": "",
3031 "c": "",
3032 }
3033 }),
3034 )
3035 .await;
3036
3037 let client = Client::new();
3038 let http_client = FakeHttpClient::with_404_response();
3039 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3040
3041 let tree = Worktree::open_local(
3042 client,
3043 user_store,
3044 Arc::from(Path::new("/root")),
3045 Arc::new(fs),
3046 Default::default(),
3047 &mut cx.to_async(),
3048 )
3049 .await
3050 .unwrap();
3051 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3052 .await;
3053
3054 tree.read_with(&cx, |tree, _| {
3055 assert_eq!(
3056 tree.entries(false)
3057 .map(|entry| entry.path.as_ref())
3058 .collect::<Vec<_>>(),
3059 vec![
3060 Path::new(""),
3061 Path::new(".gitignore"),
3062 Path::new("a"),
3063 Path::new("a/c"),
3064 ]
3065 );
3066 })
3067 }
3068
3069 #[gpui::test]
3070 async fn test_save_file(mut cx: gpui::TestAppContext) {
3071 let dir = temp_tree(json!({
3072 "file1": "the old contents",
3073 }));
3074
3075 let client = Client::new();
3076 let http_client = FakeHttpClient::with_404_response();
3077 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3078
3079 let tree = Worktree::open_local(
3080 client,
3081 user_store,
3082 dir.path(),
3083 Arc::new(RealFs),
3084 Default::default(),
3085 &mut cx.to_async(),
3086 )
3087 .await
3088 .unwrap();
3089 let buffer = tree
3090 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3091 .await
3092 .unwrap();
3093 let save = buffer.update(&mut cx, |buffer, cx| {
3094 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3095 buffer.save(cx).unwrap()
3096 });
3097 save.await.unwrap();
3098
3099 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3100 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3101 }
3102
3103 #[gpui::test]
3104 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3105 let dir = temp_tree(json!({
3106 "file1": "the old contents",
3107 }));
3108 let file_path = dir.path().join("file1");
3109
3110 let client = Client::new();
3111 let http_client = FakeHttpClient::with_404_response();
3112 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3113
3114 let tree = Worktree::open_local(
3115 client,
3116 user_store,
3117 file_path.clone(),
3118 Arc::new(RealFs),
3119 Default::default(),
3120 &mut cx.to_async(),
3121 )
3122 .await
3123 .unwrap();
3124 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3125 .await;
3126 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3127
3128 let buffer = tree
3129 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3130 .await
3131 .unwrap();
3132 let save = buffer.update(&mut cx, |buffer, cx| {
3133 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3134 buffer.save(cx).unwrap()
3135 });
3136 save.await.unwrap();
3137
3138 let new_text = std::fs::read_to_string(file_path).unwrap();
3139 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3140 }
3141
3142 #[gpui::test]
3143 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3144 let dir = temp_tree(json!({
3145 "a": {
3146 "file1": "",
3147 "file2": "",
3148 "file3": "",
3149 },
3150 "b": {
3151 "c": {
3152 "file4": "",
3153 "file5": "",
3154 }
3155 }
3156 }));
3157
3158 let user_id = 5;
3159 let mut client = Client::new();
3160 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3161 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3162 let tree = Worktree::open_local(
3163 client,
3164 user_store.clone(),
3165 dir.path(),
3166 Arc::new(RealFs),
3167 Default::default(),
3168 &mut cx.to_async(),
3169 )
3170 .await
3171 .unwrap();
3172
3173 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3174 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3175 async move { buffer.await.unwrap() }
3176 };
3177 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3178 tree.read_with(cx, |tree, _| {
3179 tree.entry_for_path(path)
3180 .expect(&format!("no entry for path {}", path))
3181 .id
3182 })
3183 };
3184
3185 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3186 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3187 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3188 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3189
3190 let file2_id = id_for_path("a/file2", &cx);
3191 let file3_id = id_for_path("a/file3", &cx);
3192 let file4_id = id_for_path("b/c/file4", &cx);
3193
3194 // Wait for the initial scan.
3195 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3196 .await;
3197
3198 // Create a remote copy of this worktree.
3199 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3200 let worktree_id = 1;
3201 let share_request = tree.update(&mut cx, |tree, cx| {
3202 tree.as_local().unwrap().share_request(cx)
3203 });
3204 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3205 server
3206 .respond(
3207 open_worktree.receipt(),
3208 proto::OpenWorktreeResponse { worktree_id: 1 },
3209 )
3210 .await;
3211
3212 let remote = Worktree::remote(
3213 proto::JoinWorktreeResponse {
3214 worktree: share_request.await.unwrap().worktree,
3215 replica_id: 1,
3216 collaborators: Vec::new(),
3217 },
3218 Client::new(),
3219 user_store,
3220 Default::default(),
3221 &mut cx.to_async(),
3222 )
3223 .await
3224 .unwrap();
3225
3226 cx.read(|cx| {
3227 assert!(!buffer2.read(cx).is_dirty());
3228 assert!(!buffer3.read(cx).is_dirty());
3229 assert!(!buffer4.read(cx).is_dirty());
3230 assert!(!buffer5.read(cx).is_dirty());
3231 });
3232
3233 // Rename and delete files and directories.
3234 tree.flush_fs_events(&cx).await;
3235 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3236 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3237 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3238 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3239 tree.flush_fs_events(&cx).await;
3240
3241 let expected_paths = vec![
3242 "a",
3243 "a/file1",
3244 "a/file2.new",
3245 "b",
3246 "d",
3247 "d/file3",
3248 "d/file4",
3249 ];
3250
3251 cx.read(|app| {
3252 assert_eq!(
3253 tree.read(app)
3254 .paths()
3255 .map(|p| p.to_str().unwrap())
3256 .collect::<Vec<_>>(),
3257 expected_paths
3258 );
3259
3260 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3261 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3262 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3263
3264 assert_eq!(
3265 buffer2.read(app).file().unwrap().path().as_ref(),
3266 Path::new("a/file2.new")
3267 );
3268 assert_eq!(
3269 buffer3.read(app).file().unwrap().path().as_ref(),
3270 Path::new("d/file3")
3271 );
3272 assert_eq!(
3273 buffer4.read(app).file().unwrap().path().as_ref(),
3274 Path::new("d/file4")
3275 );
3276 assert_eq!(
3277 buffer5.read(app).file().unwrap().path().as_ref(),
3278 Path::new("b/c/file5")
3279 );
3280
3281 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3282 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3283 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3284 assert!(buffer5.read(app).file().unwrap().is_deleted());
3285 });
3286
3287 // Update the remote worktree. Check that it becomes consistent with the
3288 // local worktree.
3289 remote.update(&mut cx, |remote, cx| {
3290 let update_message =
3291 tree.read(cx)
3292 .snapshot()
3293 .build_update(&initial_snapshot, worktree_id, true);
3294 remote
3295 .as_remote_mut()
3296 .unwrap()
3297 .snapshot
3298 .apply_update(update_message)
3299 .unwrap();
3300
3301 assert_eq!(
3302 remote
3303 .paths()
3304 .map(|p| p.to_str().unwrap())
3305 .collect::<Vec<_>>(),
3306 expected_paths
3307 );
3308 });
3309 }
3310
3311 #[gpui::test]
3312 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3313 let dir = temp_tree(json!({
3314 ".git": {},
3315 ".gitignore": "ignored-dir\n",
3316 "tracked-dir": {
3317 "tracked-file1": "tracked contents",
3318 },
3319 "ignored-dir": {
3320 "ignored-file1": "ignored contents",
3321 }
3322 }));
3323
3324 let client = Client::new();
3325 let http_client = FakeHttpClient::with_404_response();
3326 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3327
3328 let tree = Worktree::open_local(
3329 client,
3330 user_store,
3331 dir.path(),
3332 Arc::new(RealFs),
3333 Default::default(),
3334 &mut cx.to_async(),
3335 )
3336 .await
3337 .unwrap();
3338 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3339 .await;
3340 tree.flush_fs_events(&cx).await;
3341 cx.read(|cx| {
3342 let tree = tree.read(cx);
3343 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3344 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3345 assert_eq!(tracked.is_ignored, false);
3346 assert_eq!(ignored.is_ignored, true);
3347 });
3348
3349 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3350 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3351 tree.flush_fs_events(&cx).await;
3352 cx.read(|cx| {
3353 let tree = tree.read(cx);
3354 let dot_git = tree.entry_for_path(".git").unwrap();
3355 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3356 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3357 assert_eq!(tracked.is_ignored, false);
3358 assert_eq!(ignored.is_ignored, true);
3359 assert_eq!(dot_git.is_ignored, true);
3360 });
3361 }
3362
3363 #[gpui::test]
3364 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3365 let user_id = 100;
3366 let mut client = Client::new();
3367 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3368 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3369
3370 let fs = Arc::new(FakeFs::new());
3371 fs.insert_tree(
3372 "/path",
3373 json!({
3374 "to": {
3375 "the-dir": {
3376 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3377 "a.txt": "a-contents",
3378 },
3379 },
3380 }),
3381 )
3382 .await;
3383
3384 let worktree = Worktree::open_local(
3385 client.clone(),
3386 user_store,
3387 "/path/to/the-dir".as_ref(),
3388 fs,
3389 Default::default(),
3390 &mut cx.to_async(),
3391 )
3392 .await
3393 .unwrap();
3394
3395 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3396 assert_eq!(
3397 open_worktree.payload,
3398 proto::OpenWorktree {
3399 root_name: "the-dir".to_string(),
3400 authorized_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3401 }
3402 );
3403
3404 server
3405 .respond(
3406 open_worktree.receipt(),
3407 proto::OpenWorktreeResponse { worktree_id: 5 },
3408 )
3409 .await;
3410 let remote_id = worktree
3411 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3412 .await;
3413 assert_eq!(remote_id, Some(5));
3414
3415 cx.update(move |_| drop(worktree));
3416 server.receive::<proto::CloseWorktree>().await.unwrap();
3417 }
3418
3419 #[gpui::test]
3420 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3421 use std::fs;
3422
3423 let dir = temp_tree(json!({
3424 "file1": "abc",
3425 "file2": "def",
3426 "file3": "ghi",
3427 }));
3428 let client = Client::new();
3429 let http_client = FakeHttpClient::with_404_response();
3430 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3431
3432 let tree = Worktree::open_local(
3433 client,
3434 user_store,
3435 dir.path(),
3436 Arc::new(RealFs),
3437 Default::default(),
3438 &mut cx.to_async(),
3439 )
3440 .await
3441 .unwrap();
3442 tree.flush_fs_events(&cx).await;
3443 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3444 .await;
3445
3446 let buffer1 = tree
3447 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3448 .await
3449 .unwrap();
3450 let events = Rc::new(RefCell::new(Vec::new()));
3451
3452 // initially, the buffer isn't dirty.
3453 buffer1.update(&mut cx, |buffer, cx| {
3454 cx.subscribe(&buffer1, {
3455 let events = events.clone();
3456 move |_, _, event, _| events.borrow_mut().push(event.clone())
3457 })
3458 .detach();
3459
3460 assert!(!buffer.is_dirty());
3461 assert!(events.borrow().is_empty());
3462
3463 buffer.edit(vec![1..2], "", cx);
3464 });
3465
3466 // after the first edit, the buffer is dirty, and emits a dirtied event.
3467 buffer1.update(&mut cx, |buffer, cx| {
3468 assert!(buffer.text() == "ac");
3469 assert!(buffer.is_dirty());
3470 assert_eq!(
3471 *events.borrow(),
3472 &[
3473 language::Event::Edited(Patch::new(vec![text::Edit {
3474 old: 1..2,
3475 new: 1..1
3476 }])),
3477 language::Event::Dirtied
3478 ]
3479 );
3480 events.borrow_mut().clear();
3481 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3482 });
3483
3484 // after saving, the buffer is not dirty, and emits a saved event.
3485 buffer1.update(&mut cx, |buffer, cx| {
3486 assert!(!buffer.is_dirty());
3487 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3488 events.borrow_mut().clear();
3489
3490 buffer.edit(vec![1..1], "B", cx);
3491 buffer.edit(vec![2..2], "D", cx);
3492 });
3493
3494 // after editing again, the buffer is dirty, and emits another dirty event.
3495 buffer1.update(&mut cx, |buffer, cx| {
3496 assert!(buffer.text() == "aBDc");
3497 assert!(buffer.is_dirty());
3498 assert_eq!(
3499 *events.borrow(),
3500 &[
3501 language::Event::Edited(Patch::new(vec![text::Edit {
3502 old: 1..1,
3503 new: 1..2
3504 }])),
3505 language::Event::Dirtied,
3506 language::Event::Edited(Patch::new(vec![text::Edit {
3507 old: 2..2,
3508 new: 2..3
3509 }])),
3510 ],
3511 );
3512 events.borrow_mut().clear();
3513
3514 // TODO - currently, after restoring the buffer to its
3515 // previously-saved state, the is still considered dirty.
3516 buffer.edit([1..3], "", cx);
3517 assert!(buffer.text() == "ac");
3518 assert!(buffer.is_dirty());
3519 });
3520
3521 assert_eq!(
3522 *events.borrow(),
3523 &[language::Event::Edited(Patch::new(vec![text::Edit {
3524 old: 1..3,
3525 new: 1..1
3526 }]))]
3527 );
3528
3529 // When a file is deleted, the buffer is considered dirty.
3530 let events = Rc::new(RefCell::new(Vec::new()));
3531 let buffer2 = tree
3532 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3533 .await
3534 .unwrap();
3535 buffer2.update(&mut cx, |_, cx| {
3536 cx.subscribe(&buffer2, {
3537 let events = events.clone();
3538 move |_, _, event, _| events.borrow_mut().push(event.clone())
3539 })
3540 .detach();
3541 });
3542
3543 fs::remove_file(dir.path().join("file2")).unwrap();
3544 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3545 assert_eq!(
3546 *events.borrow(),
3547 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3548 );
3549
3550 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3551 let events = Rc::new(RefCell::new(Vec::new()));
3552 let buffer3 = tree
3553 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3554 .await
3555 .unwrap();
3556 buffer3.update(&mut cx, |_, cx| {
3557 cx.subscribe(&buffer3, {
3558 let events = events.clone();
3559 move |_, _, event, _| events.borrow_mut().push(event.clone())
3560 })
3561 .detach();
3562 });
3563
3564 tree.flush_fs_events(&cx).await;
3565 buffer3.update(&mut cx, |buffer, cx| {
3566 buffer.edit(Some(0..0), "x", cx);
3567 });
3568 events.borrow_mut().clear();
3569 fs::remove_file(dir.path().join("file3")).unwrap();
3570 buffer3
3571 .condition(&cx, |_, _| !events.borrow().is_empty())
3572 .await;
3573 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3574 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3575 }
3576
3577 #[gpui::test]
3578 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3579 use std::fs;
3580 use text::{Point, Selection, SelectionGoal};
3581
3582 let initial_contents = "aaa\nbbbbb\nc\n";
3583 let dir = temp_tree(json!({ "the-file": initial_contents }));
3584 let client = Client::new();
3585 let http_client = FakeHttpClient::with_404_response();
3586 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3587
3588 let tree = Worktree::open_local(
3589 client,
3590 user_store,
3591 dir.path(),
3592 Arc::new(RealFs),
3593 Default::default(),
3594 &mut cx.to_async(),
3595 )
3596 .await
3597 .unwrap();
3598 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3599 .await;
3600
3601 let abs_path = dir.path().join("the-file");
3602 let buffer = tree
3603 .update(&mut cx, |tree, cx| {
3604 tree.open_buffer(Path::new("the-file"), cx)
3605 })
3606 .await
3607 .unwrap();
3608
3609 // Add a cursor on each row.
3610 let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3611 assert!(!buffer.is_dirty());
3612 buffer.add_selection_set(
3613 &(0..3)
3614 .map(|row| Selection {
3615 id: row as usize,
3616 start: Point::new(row, 1),
3617 end: Point::new(row, 1),
3618 reversed: false,
3619 goal: SelectionGoal::None,
3620 })
3621 .collect::<Vec<_>>(),
3622 cx,
3623 )
3624 });
3625
3626 // Change the file on disk, adding two new lines of text, and removing
3627 // one line.
3628 buffer.read_with(&cx, |buffer, _| {
3629 assert!(!buffer.is_dirty());
3630 assert!(!buffer.has_conflict());
3631 });
3632 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3633 fs::write(&abs_path, new_contents).unwrap();
3634
3635 // Because the buffer was not modified, it is reloaded from disk. Its
3636 // contents are edited according to the diff between the old and new
3637 // file contents.
3638 buffer
3639 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3640 .await;
3641
3642 buffer.update(&mut cx, |buffer, _| {
3643 assert_eq!(buffer.text(), new_contents);
3644 assert!(!buffer.is_dirty());
3645 assert!(!buffer.has_conflict());
3646
3647 let cursor_positions = buffer
3648 .selection_set(selection_set_id)
3649 .unwrap()
3650 .selections::<Point>(&*buffer)
3651 .map(|selection| {
3652 assert_eq!(selection.start, selection.end);
3653 selection.start
3654 })
3655 .collect::<Vec<_>>();
3656 assert_eq!(
3657 cursor_positions,
3658 [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3659 );
3660 });
3661
3662 // Modify the buffer
3663 buffer.update(&mut cx, |buffer, cx| {
3664 buffer.edit(vec![0..0], " ", cx);
3665 assert!(buffer.is_dirty());
3666 assert!(!buffer.has_conflict());
3667 });
3668
3669 // Change the file on disk again, adding blank lines to the beginning.
3670 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3671
3672 // Because the buffer is modified, it doesn't reload from disk, but is
3673 // marked as having a conflict.
3674 buffer
3675 .condition(&cx, |buffer, _| buffer.has_conflict())
3676 .await;
3677 }
3678
3679 #[gpui::test]
3680 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3681 let (language_server_config, mut fake_server) =
3682 LanguageServerConfig::fake(cx.background()).await;
3683 let mut languages = LanguageRegistry::new();
3684 languages.add(Arc::new(Language::new(
3685 LanguageConfig {
3686 name: "Rust".to_string(),
3687 path_suffixes: vec!["rs".to_string()],
3688 language_server: Some(language_server_config),
3689 ..Default::default()
3690 },
3691 Some(tree_sitter_rust::language()),
3692 )));
3693
3694 let dir = temp_tree(json!({
3695 "a.rs": "fn a() { A }",
3696 "b.rs": "const y: i32 = 1",
3697 }));
3698
3699 let client = Client::new();
3700 let http_client = FakeHttpClient::with_404_response();
3701 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3702
3703 let tree = Worktree::open_local(
3704 client,
3705 user_store,
3706 dir.path(),
3707 Arc::new(RealFs),
3708 Arc::new(languages),
3709 &mut cx.to_async(),
3710 )
3711 .await
3712 .unwrap();
3713 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3714 .await;
3715
3716 // Cause worktree to start the fake language server
3717 let _buffer = tree
3718 .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
3719 .await
3720 .unwrap();
3721
3722 fake_server
3723 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3724 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3725 version: None,
3726 diagnostics: vec![lsp::Diagnostic {
3727 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3728 severity: Some(lsp::DiagnosticSeverity::ERROR),
3729 message: "undefined variable 'A'".to_string(),
3730 ..Default::default()
3731 }],
3732 })
3733 .await;
3734
3735 let buffer = tree
3736 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3737 .await
3738 .unwrap();
3739
3740 buffer.read_with(&cx, |buffer, _| {
3741 let diagnostics = buffer
3742 .diagnostics_in_range(0..buffer.len())
3743 .collect::<Vec<_>>();
3744 assert_eq!(
3745 diagnostics,
3746 &[(
3747 Point::new(0, 9)..Point::new(0, 10),
3748 &Diagnostic {
3749 severity: lsp::DiagnosticSeverity::ERROR,
3750 message: "undefined variable 'A'".to_string(),
3751 group_id: 0,
3752 is_primary: true
3753 }
3754 )]
3755 )
3756 });
3757 }
3758
3759 #[gpui::test(iterations = 100)]
3760 fn test_random(mut rng: StdRng) {
3761 let operations = env::var("OPERATIONS")
3762 .map(|o| o.parse().unwrap())
3763 .unwrap_or(40);
3764 let initial_entries = env::var("INITIAL_ENTRIES")
3765 .map(|o| o.parse().unwrap())
3766 .unwrap_or(20);
3767
3768 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3769 for _ in 0..initial_entries {
3770 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3771 }
3772 log::info!("Generated initial tree");
3773
3774 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3775 let fs = Arc::new(RealFs);
3776 let next_entry_id = Arc::new(AtomicUsize::new(0));
3777 let mut initial_snapshot = Snapshot {
3778 id: 0,
3779 scan_id: 0,
3780 abs_path: root_dir.path().into(),
3781 entries_by_path: Default::default(),
3782 entries_by_id: Default::default(),
3783 removed_entry_ids: Default::default(),
3784 ignores: Default::default(),
3785 root_name: Default::default(),
3786 root_char_bag: Default::default(),
3787 next_entry_id: next_entry_id.clone(),
3788 };
3789 initial_snapshot.insert_entry(
3790 Entry::new(
3791 Path::new("").into(),
3792 &smol::block_on(fs.metadata(root_dir.path()))
3793 .unwrap()
3794 .unwrap(),
3795 &next_entry_id,
3796 Default::default(),
3797 ),
3798 fs.as_ref(),
3799 );
3800 let mut scanner = BackgroundScanner::new(
3801 Arc::new(Mutex::new(initial_snapshot.clone())),
3802 notify_tx,
3803 fs.clone(),
3804 Arc::new(gpui::executor::Background::new()),
3805 );
3806 smol::block_on(scanner.scan_dirs()).unwrap();
3807 scanner.snapshot().check_invariants();
3808
3809 let mut events = Vec::new();
3810 let mut snapshots = Vec::new();
3811 let mut mutations_len = operations;
3812 while mutations_len > 1 {
3813 if !events.is_empty() && rng.gen_bool(0.4) {
3814 let len = rng.gen_range(0..=events.len());
3815 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3816 log::info!("Delivering events: {:#?}", to_deliver);
3817 smol::block_on(scanner.process_events(to_deliver));
3818 scanner.snapshot().check_invariants();
3819 } else {
3820 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3821 mutations_len -= 1;
3822 }
3823
3824 if rng.gen_bool(0.2) {
3825 snapshots.push(scanner.snapshot());
3826 }
3827 }
3828 log::info!("Quiescing: {:#?}", events);
3829 smol::block_on(scanner.process_events(events));
3830 scanner.snapshot().check_invariants();
3831
3832 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3833 let mut new_scanner = BackgroundScanner::new(
3834 Arc::new(Mutex::new(initial_snapshot)),
3835 notify_tx,
3836 scanner.fs.clone(),
3837 scanner.executor.clone(),
3838 );
3839 smol::block_on(new_scanner.scan_dirs()).unwrap();
3840 assert_eq!(
3841 scanner.snapshot().to_vec(true),
3842 new_scanner.snapshot().to_vec(true)
3843 );
3844
3845 for mut prev_snapshot in snapshots {
3846 let include_ignored = rng.gen::<bool>();
3847 if !include_ignored {
3848 let mut entries_by_path_edits = Vec::new();
3849 let mut entries_by_id_edits = Vec::new();
3850 for entry in prev_snapshot
3851 .entries_by_id
3852 .cursor::<()>()
3853 .filter(|e| e.is_ignored)
3854 {
3855 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
3856 entries_by_id_edits.push(Edit::Remove(entry.id));
3857 }
3858
3859 prev_snapshot
3860 .entries_by_path
3861 .edit(entries_by_path_edits, &());
3862 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
3863 }
3864
3865 let update = scanner
3866 .snapshot()
3867 .build_update(&prev_snapshot, 0, include_ignored);
3868 prev_snapshot.apply_update(update).unwrap();
3869 assert_eq!(
3870 prev_snapshot.to_vec(true),
3871 scanner.snapshot().to_vec(include_ignored)
3872 );
3873 }
3874 }
3875
3876 fn randomly_mutate_tree(
3877 root_path: &Path,
3878 insertion_probability: f64,
3879 rng: &mut impl Rng,
3880 ) -> Result<Vec<fsevent::Event>> {
3881 let root_path = root_path.canonicalize().unwrap();
3882 let (dirs, files) = read_dir_recursive(root_path.clone());
3883
3884 let mut events = Vec::new();
3885 let mut record_event = |path: PathBuf| {
3886 events.push(fsevent::Event {
3887 event_id: SystemTime::now()
3888 .duration_since(UNIX_EPOCH)
3889 .unwrap()
3890 .as_secs(),
3891 flags: fsevent::StreamFlags::empty(),
3892 path,
3893 });
3894 };
3895
3896 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3897 let path = dirs.choose(rng).unwrap();
3898 let new_path = path.join(gen_name(rng));
3899
3900 if rng.gen() {
3901 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3902 std::fs::create_dir(&new_path)?;
3903 } else {
3904 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3905 std::fs::write(&new_path, "")?;
3906 }
3907 record_event(new_path);
3908 } else if rng.gen_bool(0.05) {
3909 let ignore_dir_path = dirs.choose(rng).unwrap();
3910 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3911
3912 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3913 let files_to_ignore = {
3914 let len = rng.gen_range(0..=subfiles.len());
3915 subfiles.choose_multiple(rng, len)
3916 };
3917 let dirs_to_ignore = {
3918 let len = rng.gen_range(0..subdirs.len());
3919 subdirs.choose_multiple(rng, len)
3920 };
3921
3922 let mut ignore_contents = String::new();
3923 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3924 write!(
3925 ignore_contents,
3926 "{}\n",
3927 path_to_ignore
3928 .strip_prefix(&ignore_dir_path)?
3929 .to_str()
3930 .unwrap()
3931 )
3932 .unwrap();
3933 }
3934 log::info!(
3935 "Creating {:?} with contents:\n{}",
3936 ignore_path.strip_prefix(&root_path)?,
3937 ignore_contents
3938 );
3939 std::fs::write(&ignore_path, ignore_contents).unwrap();
3940 record_event(ignore_path);
3941 } else {
3942 let old_path = {
3943 let file_path = files.choose(rng);
3944 let dir_path = dirs[1..].choose(rng);
3945 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3946 };
3947
3948 let is_rename = rng.gen();
3949 if is_rename {
3950 let new_path_parent = dirs
3951 .iter()
3952 .filter(|d| !d.starts_with(old_path))
3953 .choose(rng)
3954 .unwrap();
3955
3956 let overwrite_existing_dir =
3957 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3958 let new_path = if overwrite_existing_dir {
3959 std::fs::remove_dir_all(&new_path_parent).ok();
3960 new_path_parent.to_path_buf()
3961 } else {
3962 new_path_parent.join(gen_name(rng))
3963 };
3964
3965 log::info!(
3966 "Renaming {:?} to {}{:?}",
3967 old_path.strip_prefix(&root_path)?,
3968 if overwrite_existing_dir {
3969 "overwrite "
3970 } else {
3971 ""
3972 },
3973 new_path.strip_prefix(&root_path)?
3974 );
3975 std::fs::rename(&old_path, &new_path)?;
3976 record_event(old_path.clone());
3977 record_event(new_path);
3978 } else if old_path.is_dir() {
3979 let (dirs, files) = read_dir_recursive(old_path.clone());
3980
3981 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3982 std::fs::remove_dir_all(&old_path).unwrap();
3983 for file in files {
3984 record_event(file);
3985 }
3986 for dir in dirs {
3987 record_event(dir);
3988 }
3989 } else {
3990 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3991 std::fs::remove_file(old_path).unwrap();
3992 record_event(old_path.clone());
3993 }
3994 }
3995
3996 Ok(events)
3997 }
3998
3999 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
4000 let child_entries = std::fs::read_dir(&path).unwrap();
4001 let mut dirs = vec![path];
4002 let mut files = Vec::new();
4003 for child_entry in child_entries {
4004 let child_path = child_entry.unwrap().path();
4005 if child_path.is_dir() {
4006 let (child_dirs, child_files) = read_dir_recursive(child_path);
4007 dirs.extend(child_dirs);
4008 files.extend(child_files);
4009 } else {
4010 files.push(child_path);
4011 }
4012 }
4013 (dirs, files)
4014 }
4015
4016 fn gen_name(rng: &mut impl Rng) -> String {
4017 (0..6)
4018 .map(|_| rng.sample(rand::distributions::Alphanumeric))
4019 .map(char::from)
4020 .collect()
4021 }
4022
4023 impl Snapshot {
4024 fn check_invariants(&self) {
4025 let mut files = self.files(true, 0);
4026 let mut visible_files = self.files(false, 0);
4027 for entry in self.entries_by_path.cursor::<()>() {
4028 if entry.is_file() {
4029 assert_eq!(files.next().unwrap().inode, entry.inode);
4030 if !entry.is_ignored {
4031 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
4032 }
4033 }
4034 }
4035 assert!(files.next().is_none());
4036 assert!(visible_files.next().is_none());
4037
4038 let mut bfs_paths = Vec::new();
4039 let mut stack = vec![Path::new("")];
4040 while let Some(path) = stack.pop() {
4041 bfs_paths.push(path);
4042 let ix = stack.len();
4043 for child_entry in self.child_entries(path) {
4044 stack.insert(ix, &child_entry.path);
4045 }
4046 }
4047
4048 let dfs_paths = self
4049 .entries_by_path
4050 .cursor::<()>()
4051 .map(|e| e.path.as_ref())
4052 .collect::<Vec<_>>();
4053 assert_eq!(bfs_paths, dfs_paths);
4054
4055 for (ignore_parent_path, _) in &self.ignores {
4056 assert!(self.entry_for_path(ignore_parent_path).is_some());
4057 assert!(self
4058 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
4059 .is_some());
4060 }
4061 }
4062
4063 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
4064 let mut paths = Vec::new();
4065 for entry in self.entries_by_path.cursor::<()>() {
4066 if include_ignored || !entry.is_ignored {
4067 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
4068 }
4069 }
4070 paths.sort_by(|a, b| a.0.cmp(&b.0));
4071 paths
4072 }
4073 }
4074}