1use super::{
2 fs::{self, Fs},
3 ignore::IgnoreStack,
4};
5use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
6use anyhow::{anyhow, Context, Result};
7use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
8use clock::ReplicaId;
9use futures::{Stream, StreamExt};
10use fuzzy::CharBag;
11use gpui::{
12 executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
13 Task, UpgradeModelHandle, WeakModelHandle,
14};
15use language::{Buffer, Language, LanguageRegistry, Operation, Rope};
16use lazy_static::lazy_static;
17use lsp::LanguageServer;
18use parking_lot::Mutex;
19use postage::{
20 prelude::{Sink as _, Stream as _},
21 watch,
22};
23
24use serde::Deserialize;
25use smol::channel::{self, Sender};
26use std::{
27 any::Any,
28 cmp::{self, Ordering},
29 collections::HashMap,
30 convert::{TryFrom, TryInto},
31 ffi::{OsStr, OsString},
32 fmt,
33 future::Future,
34 ops::Deref,
35 path::{Path, PathBuf},
36 sync::{
37 atomic::{AtomicUsize, Ordering::SeqCst},
38 Arc,
39 },
40 time::{Duration, SystemTime},
41};
42use sum_tree::Bias;
43use sum_tree::{Edit, SeekTarget, SumTree};
44use util::{ResultExt, TryFutureExt};
45
46lazy_static! {
47 static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
48}
49
50#[derive(Clone, Debug)]
51enum ScanState {
52 Idle,
53 Scanning,
54 Err(Arc<anyhow::Error>),
55}
56
57pub enum Worktree {
58 Local(LocalWorktree),
59 Remote(RemoteWorktree),
60}
61
62pub enum Event {
63 Closed,
64}
65
66#[derive(Clone, Debug)]
67pub struct Collaborator {
68 pub user: Arc<User>,
69 pub peer_id: PeerId,
70 pub replica_id: ReplicaId,
71}
72
73impl Collaborator {
74 fn from_proto(
75 message: proto::Collaborator,
76 user_store: &ModelHandle<UserStore>,
77 cx: &mut AsyncAppContext,
78 ) -> impl Future<Output = Result<Self>> {
79 let user = user_store.update(cx, |user_store, cx| {
80 user_store.fetch_user(message.user_id, cx)
81 });
82
83 async move {
84 Ok(Self {
85 peer_id: PeerId(message.peer_id),
86 user: user.await?,
87 replica_id: message.replica_id as ReplicaId,
88 })
89 }
90 }
91}
92
93impl Entity for Worktree {
94 type Event = Event;
95
96 fn release(&mut self, cx: &mut MutableAppContext) {
97 match self {
98 Self::Local(tree) => {
99 if let Some(worktree_id) = *tree.remote_id.borrow() {
100 let rpc = tree.client.clone();
101 cx.spawn(|_| async move {
102 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
103 log::error!("error closing worktree: {}", err);
104 }
105 })
106 .detach();
107 }
108 }
109 Self::Remote(tree) => {
110 let rpc = tree.client.clone();
111 let worktree_id = tree.remote_id;
112 cx.spawn(|_| async move {
113 if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await {
114 log::error!("error closing worktree: {}", err);
115 }
116 })
117 .detach();
118 }
119 }
120 }
121
122 fn app_will_quit(
123 &mut self,
124 _: &mut MutableAppContext,
125 ) -> Option<std::pin::Pin<Box<dyn 'static + Future<Output = ()>>>> {
126 use futures::FutureExt;
127
128 if let Self::Local(worktree) = self {
129 let shutdown_futures = worktree
130 .language_servers
131 .drain()
132 .filter_map(|(_, server)| server.shutdown())
133 .collect::<Vec<_>>();
134 Some(
135 async move {
136 futures::future::join_all(shutdown_futures).await;
137 }
138 .boxed(),
139 )
140 } else {
141 None
142 }
143 }
144}
145
146impl Worktree {
147 pub async fn open_local(
148 client: Arc<Client>,
149 user_store: ModelHandle<UserStore>,
150 path: impl Into<Arc<Path>>,
151 fs: Arc<dyn Fs>,
152 languages: Arc<LanguageRegistry>,
153 cx: &mut AsyncAppContext,
154 ) -> Result<ModelHandle<Self>> {
155 let (tree, scan_states_tx) =
156 LocalWorktree::new(client, user_store, path, fs.clone(), languages, cx).await?;
157 tree.update(cx, |tree, cx| {
158 let tree = tree.as_local_mut().unwrap();
159 let abs_path = tree.snapshot.abs_path.clone();
160 let background_snapshot = tree.background_snapshot.clone();
161 let background = cx.background().clone();
162 tree._background_scanner_task = Some(cx.background().spawn(async move {
163 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
164 let scanner =
165 BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
166 scanner.run(events).await;
167 }));
168 });
169 Ok(tree)
170 }
171
172 pub async fn open_remote(
173 client: Arc<Client>,
174 id: u64,
175 languages: Arc<LanguageRegistry>,
176 user_store: ModelHandle<UserStore>,
177 cx: &mut AsyncAppContext,
178 ) -> Result<ModelHandle<Self>> {
179 let response = client
180 .request(proto::JoinWorktree { worktree_id: id })
181 .await?;
182 Worktree::remote(response, client, user_store, languages, cx).await
183 }
184
185 async fn remote(
186 join_response: proto::JoinWorktreeResponse,
187 client: Arc<Client>,
188 user_store: ModelHandle<UserStore>,
189 languages: Arc<LanguageRegistry>,
190 cx: &mut AsyncAppContext,
191 ) -> Result<ModelHandle<Self>> {
192 let worktree = join_response
193 .worktree
194 .ok_or_else(|| anyhow!("empty worktree"))?;
195
196 let remote_id = worktree.id;
197 let replica_id = join_response.replica_id as ReplicaId;
198 let root_char_bag: CharBag = worktree
199 .root_name
200 .chars()
201 .map(|c| c.to_ascii_lowercase())
202 .collect();
203 let root_name = worktree.root_name.clone();
204 let (entries_by_path, entries_by_id) = cx
205 .background()
206 .spawn(async move {
207 let mut entries_by_path_edits = Vec::new();
208 let mut entries_by_id_edits = Vec::new();
209 for entry in worktree.entries {
210 match Entry::try_from((&root_char_bag, entry)) {
211 Ok(entry) => {
212 entries_by_id_edits.push(Edit::Insert(PathEntry {
213 id: entry.id,
214 path: entry.path.clone(),
215 is_ignored: entry.is_ignored,
216 scan_id: 0,
217 }));
218 entries_by_path_edits.push(Edit::Insert(entry));
219 }
220 Err(err) => log::warn!("error for remote worktree entry {:?}", err),
221 }
222 }
223
224 let mut entries_by_path = SumTree::new();
225 let mut entries_by_id = SumTree::new();
226 entries_by_path.edit(entries_by_path_edits, &());
227 entries_by_id.edit(entries_by_id_edits, &());
228 (entries_by_path, entries_by_id)
229 })
230 .await;
231
232 let user_ids = join_response
233 .collaborators
234 .iter()
235 .map(|peer| peer.user_id)
236 .collect();
237 user_store
238 .update(cx, |user_store, cx| user_store.load_users(user_ids, cx))
239 .await?;
240 let mut collaborators = HashMap::with_capacity(join_response.collaborators.len());
241 for message in join_response.collaborators {
242 let collaborator = Collaborator::from_proto(message, &user_store, cx).await?;
243 collaborators.insert(collaborator.peer_id, collaborator);
244 }
245
246 let worktree = cx.update(|cx| {
247 cx.add_model(|cx: &mut ModelContext<Worktree>| {
248 let snapshot = Snapshot {
249 id: cx.model_id(),
250 scan_id: 0,
251 abs_path: Path::new("").into(),
252 root_name,
253 root_char_bag,
254 ignores: Default::default(),
255 entries_by_path,
256 entries_by_id,
257 removed_entry_ids: Default::default(),
258 next_entry_id: Default::default(),
259 };
260
261 let (updates_tx, mut updates_rx) = postage::mpsc::channel(64);
262 let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone());
263
264 cx.background()
265 .spawn(async move {
266 while let Some(update) = updates_rx.recv().await {
267 let mut snapshot = snapshot_tx.borrow().clone();
268 if let Err(error) = snapshot.apply_update(update) {
269 log::error!("error applying worktree update: {}", error);
270 }
271 *snapshot_tx.borrow_mut() = snapshot;
272 }
273 })
274 .detach();
275
276 {
277 let mut snapshot_rx = snapshot_rx.clone();
278 cx.spawn_weak(|this, mut cx| async move {
279 while let Some(_) = snapshot_rx.recv().await {
280 if let Some(this) = cx.read(|cx| this.upgrade(cx)) {
281 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
282 } else {
283 break;
284 }
285 }
286 })
287 .detach();
288 }
289
290 let _subscriptions = vec![
291 client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator),
292 client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator),
293 client.subscribe_to_entity(remote_id, cx, Self::handle_update),
294 client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
295 client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
296 client.subscribe_to_entity(remote_id, cx, Self::handle_unshare),
297 ];
298
299 Worktree::Remote(RemoteWorktree {
300 remote_id,
301 replica_id,
302 snapshot,
303 snapshot_rx,
304 updates_tx,
305 client: client.clone(),
306 open_buffers: Default::default(),
307 collaborators,
308 queued_operations: Default::default(),
309 languages,
310 user_store,
311 _subscriptions,
312 })
313 })
314 });
315
316 Ok(worktree)
317 }
318
319 pub fn as_local(&self) -> Option<&LocalWorktree> {
320 if let Worktree::Local(worktree) = self {
321 Some(worktree)
322 } else {
323 None
324 }
325 }
326
327 pub fn as_local_mut(&mut self) -> Option<&mut LocalWorktree> {
328 if let Worktree::Local(worktree) = self {
329 Some(worktree)
330 } else {
331 None
332 }
333 }
334
335 pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
336 if let Worktree::Remote(worktree) = self {
337 Some(worktree)
338 } else {
339 None
340 }
341 }
342
343 pub fn snapshot(&self) -> Snapshot {
344 match self {
345 Worktree::Local(worktree) => worktree.snapshot(),
346 Worktree::Remote(worktree) => worktree.snapshot(),
347 }
348 }
349
350 pub fn replica_id(&self) -> ReplicaId {
351 match self {
352 Worktree::Local(_) => 0,
353 Worktree::Remote(worktree) => worktree.replica_id,
354 }
355 }
356
357 pub fn languages(&self) -> &Arc<LanguageRegistry> {
358 match self {
359 Worktree::Local(worktree) => &worktree.languages,
360 Worktree::Remote(worktree) => &worktree.languages,
361 }
362 }
363
364 pub fn user_store(&self) -> &ModelHandle<UserStore> {
365 match self {
366 Worktree::Local(worktree) => &worktree.user_store,
367 Worktree::Remote(worktree) => &worktree.user_store,
368 }
369 }
370
371 pub fn handle_add_collaborator(
372 &mut self,
373 mut envelope: TypedEnvelope<proto::AddCollaborator>,
374 _: Arc<Client>,
375 cx: &mut ModelContext<Self>,
376 ) -> Result<()> {
377 let user_store = self.user_store().clone();
378 let collaborator = envelope
379 .payload
380 .collaborator
381 .take()
382 .ok_or_else(|| anyhow!("empty collaborator"))?;
383
384 cx.spawn(|this, mut cx| {
385 async move {
386 let collaborator =
387 Collaborator::from_proto(collaborator, &user_store, &mut cx).await?;
388 this.update(&mut cx, |this, cx| match this {
389 Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx),
390 Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx),
391 });
392 Ok(())
393 }
394 .log_err()
395 })
396 .detach();
397
398 Ok(())
399 }
400
401 pub fn handle_remove_collaborator(
402 &mut self,
403 envelope: TypedEnvelope<proto::RemoveCollaborator>,
404 _: Arc<Client>,
405 cx: &mut ModelContext<Self>,
406 ) -> Result<()> {
407 match self {
408 Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx),
409 Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx),
410 }
411 }
412
413 pub fn handle_update(
414 &mut self,
415 envelope: TypedEnvelope<proto::UpdateWorktree>,
416 _: Arc<Client>,
417 cx: &mut ModelContext<Self>,
418 ) -> anyhow::Result<()> {
419 self.as_remote_mut()
420 .unwrap()
421 .update_from_remote(envelope, cx)
422 }
423
424 pub fn handle_open_buffer(
425 &mut self,
426 envelope: TypedEnvelope<proto::OpenBuffer>,
427 rpc: Arc<Client>,
428 cx: &mut ModelContext<Self>,
429 ) -> anyhow::Result<()> {
430 let receipt = envelope.receipt();
431
432 let response = self
433 .as_local_mut()
434 .unwrap()
435 .open_remote_buffer(envelope, cx);
436
437 cx.background()
438 .spawn(
439 async move {
440 rpc.respond(receipt, response.await?).await?;
441 Ok(())
442 }
443 .log_err(),
444 )
445 .detach();
446
447 Ok(())
448 }
449
450 pub fn handle_close_buffer(
451 &mut self,
452 envelope: TypedEnvelope<proto::CloseBuffer>,
453 _: Arc<Client>,
454 cx: &mut ModelContext<Self>,
455 ) -> anyhow::Result<()> {
456 self.as_local_mut()
457 .unwrap()
458 .close_remote_buffer(envelope, cx)
459 }
460
461 pub fn collaborators(&self) -> &HashMap<PeerId, Collaborator> {
462 match self {
463 Worktree::Local(worktree) => &worktree.collaborators,
464 Worktree::Remote(worktree) => &worktree.collaborators,
465 }
466 }
467
468 pub fn open_buffer(
469 &mut self,
470 path: impl AsRef<Path>,
471 cx: &mut ModelContext<Self>,
472 ) -> Task<Result<ModelHandle<Buffer>>> {
473 match self {
474 Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
475 Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
476 }
477 }
478
479 #[cfg(feature = "test-support")]
480 pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
481 let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
482 Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
483 Worktree::Remote(worktree) => {
484 Box::new(worktree.open_buffers.values().filter_map(|buf| {
485 if let RemoteBuffer::Loaded(buf) = buf {
486 Some(buf)
487 } else {
488 None
489 }
490 }))
491 }
492 };
493
494 let path = path.as_ref();
495 open_buffers
496 .find(|buffer| {
497 if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
498 file.path().as_ref() == path
499 } else {
500 false
501 }
502 })
503 .is_some()
504 }
505
506 pub fn handle_update_buffer(
507 &mut self,
508 envelope: TypedEnvelope<proto::UpdateBuffer>,
509 _: Arc<Client>,
510 cx: &mut ModelContext<Self>,
511 ) -> Result<()> {
512 let payload = envelope.payload.clone();
513 let buffer_id = payload.buffer_id as usize;
514 let ops = payload
515 .operations
516 .into_iter()
517 .map(|op| language::proto::deserialize_operation(op))
518 .collect::<Result<Vec<_>, _>>()?;
519
520 match self {
521 Worktree::Local(worktree) => {
522 let buffer = worktree
523 .open_buffers
524 .get(&buffer_id)
525 .and_then(|buf| buf.upgrade(cx))
526 .ok_or_else(|| {
527 anyhow!("invalid buffer {} in update buffer message", buffer_id)
528 })?;
529 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
530 }
531 Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
532 Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
533 Some(RemoteBuffer::Loaded(buffer)) => {
534 if let Some(buffer) = buffer.upgrade(cx) {
535 buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
536 } else {
537 worktree
538 .open_buffers
539 .insert(buffer_id, RemoteBuffer::Operations(ops));
540 }
541 }
542 None => {
543 worktree
544 .open_buffers
545 .insert(buffer_id, RemoteBuffer::Operations(ops));
546 }
547 },
548 }
549
550 Ok(())
551 }
552
553 pub fn handle_save_buffer(
554 &mut self,
555 envelope: TypedEnvelope<proto::SaveBuffer>,
556 rpc: Arc<Client>,
557 cx: &mut ModelContext<Self>,
558 ) -> Result<()> {
559 let sender_id = envelope.original_sender_id()?;
560 let buffer = self
561 .as_local()
562 .unwrap()
563 .shared_buffers
564 .get(&sender_id)
565 .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
566 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
567
568 let receipt = envelope.receipt();
569 let worktree_id = envelope.payload.worktree_id;
570 let buffer_id = envelope.payload.buffer_id;
571 let save = cx.spawn(|_, mut cx| async move {
572 buffer.update(&mut cx, |buffer, cx| buffer.save(cx))?.await
573 });
574
575 cx.background()
576 .spawn(
577 async move {
578 let (version, mtime) = save.await?;
579
580 rpc.respond(
581 receipt,
582 proto::BufferSaved {
583 worktree_id,
584 buffer_id,
585 version: (&version).into(),
586 mtime: Some(mtime.into()),
587 },
588 )
589 .await?;
590
591 Ok(())
592 }
593 .log_err(),
594 )
595 .detach();
596
597 Ok(())
598 }
599
600 pub fn handle_buffer_saved(
601 &mut self,
602 envelope: TypedEnvelope<proto::BufferSaved>,
603 _: Arc<Client>,
604 cx: &mut ModelContext<Self>,
605 ) -> Result<()> {
606 let payload = envelope.payload.clone();
607 let worktree = self.as_remote_mut().unwrap();
608 if let Some(buffer) = worktree
609 .open_buffers
610 .get(&(payload.buffer_id as usize))
611 .and_then(|buf| buf.upgrade(cx))
612 {
613 buffer.update(cx, |buffer, cx| {
614 let version = payload.version.try_into()?;
615 let mtime = payload
616 .mtime
617 .ok_or_else(|| anyhow!("missing mtime"))?
618 .into();
619 buffer.did_save(version, mtime, None, cx);
620 Result::<_, anyhow::Error>::Ok(())
621 })?;
622 }
623 Ok(())
624 }
625
626 pub fn handle_unshare(
627 &mut self,
628 _: TypedEnvelope<proto::UnshareWorktree>,
629 _: Arc<Client>,
630 cx: &mut ModelContext<Self>,
631 ) -> Result<()> {
632 cx.emit(Event::Closed);
633 Ok(())
634 }
635
636 fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
637 match self {
638 Self::Local(worktree) => {
639 let is_fake_fs = worktree.fs.is_fake();
640 worktree.snapshot = worktree.background_snapshot.lock().clone();
641 if worktree.is_scanning() {
642 if worktree.poll_task.is_none() {
643 worktree.poll_task = Some(cx.spawn(|this, mut cx| async move {
644 if is_fake_fs {
645 smol::future::yield_now().await;
646 } else {
647 smol::Timer::after(Duration::from_millis(100)).await;
648 }
649 this.update(&mut cx, |this, cx| {
650 this.as_local_mut().unwrap().poll_task = None;
651 this.poll_snapshot(cx);
652 })
653 }));
654 }
655 } else {
656 worktree.poll_task.take();
657 self.update_open_buffers(cx);
658 }
659 }
660 Self::Remote(worktree) => {
661 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
662 self.update_open_buffers(cx);
663 }
664 };
665
666 cx.notify();
667 }
668
669 fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
670 let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
671 Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
672 Self::Remote(worktree) => {
673 Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
674 if let RemoteBuffer::Loaded(buf) = buf {
675 Some((id, buf))
676 } else {
677 None
678 }
679 }))
680 }
681 };
682
683 let local = self.as_local().is_some();
684 let worktree_path = self.abs_path.clone();
685 let worktree_handle = cx.handle();
686 let mut buffers_to_delete = Vec::new();
687 for (buffer_id, buffer) in open_buffers {
688 if let Some(buffer) = buffer.upgrade(cx) {
689 buffer.update(cx, |buffer, cx| {
690 if let Some(old_file) = buffer.file() {
691 let new_file = if let Some(entry) = old_file
692 .entry_id()
693 .and_then(|entry_id| self.entry_for_id(entry_id))
694 {
695 File {
696 is_local: local,
697 worktree_path: worktree_path.clone(),
698 entry_id: Some(entry.id),
699 mtime: entry.mtime,
700 path: entry.path.clone(),
701 worktree: worktree_handle.clone(),
702 }
703 } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
704 File {
705 is_local: local,
706 worktree_path: worktree_path.clone(),
707 entry_id: Some(entry.id),
708 mtime: entry.mtime,
709 path: entry.path.clone(),
710 worktree: worktree_handle.clone(),
711 }
712 } else {
713 File {
714 is_local: local,
715 worktree_path: worktree_path.clone(),
716 entry_id: None,
717 path: old_file.path().clone(),
718 mtime: old_file.mtime(),
719 worktree: worktree_handle.clone(),
720 }
721 };
722
723 if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
724 task.detach();
725 }
726 }
727 });
728 } else {
729 buffers_to_delete.push(*buffer_id);
730 }
731 }
732
733 for buffer_id in buffers_to_delete {
734 match self {
735 Self::Local(worktree) => {
736 worktree.open_buffers.remove(&buffer_id);
737 }
738 Self::Remote(worktree) => {
739 worktree.open_buffers.remove(&buffer_id);
740 }
741 }
742 }
743 }
744
745 fn update_diagnostics(
746 &mut self,
747 params: lsp::PublishDiagnosticsParams,
748 cx: &mut ModelContext<Worktree>,
749 ) -> Result<()> {
750 let this = self.as_local_mut().ok_or_else(|| anyhow!("not local"))?;
751 let file_path = params
752 .uri
753 .to_file_path()
754 .map_err(|_| anyhow!("URI is not a file"))?
755 .strip_prefix(&this.abs_path)
756 .context("path is not within worktree")?
757 .to_owned();
758
759 for buffer in this.open_buffers.values() {
760 if let Some(buffer) = buffer.upgrade(cx) {
761 if buffer
762 .read(cx)
763 .file()
764 .map_or(false, |file| file.path().as_ref() == file_path)
765 {
766 let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
767 (
768 buffer.remote_id(),
769 buffer.update_diagnostics(params.version, params.diagnostics, cx),
770 )
771 });
772 self.send_buffer_update(remote_id, operation?, cx);
773 return Ok(());
774 }
775 }
776 }
777
778 this.diagnostics.insert(file_path, params.diagnostics);
779 Ok(())
780 }
781
782 fn send_buffer_update(
783 &mut self,
784 buffer_id: u64,
785 operation: Operation,
786 cx: &mut ModelContext<Self>,
787 ) {
788 if let Some((rpc, remote_id)) = match self {
789 Worktree::Local(worktree) => worktree
790 .remote_id
791 .borrow()
792 .map(|id| (worktree.client.clone(), id)),
793 Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)),
794 } {
795 cx.spawn(|worktree, mut cx| async move {
796 if let Err(error) = rpc
797 .request(proto::UpdateBuffer {
798 worktree_id: remote_id,
799 buffer_id,
800 operations: vec![language::proto::serialize_operation(&operation)],
801 })
802 .await
803 {
804 worktree.update(&mut cx, |worktree, _| {
805 log::error!("error sending buffer operation: {}", error);
806 match worktree {
807 Worktree::Local(t) => &mut t.queued_operations,
808 Worktree::Remote(t) => &mut t.queued_operations,
809 }
810 .push((buffer_id, operation));
811 });
812 }
813 })
814 .detach();
815 }
816 }
817}
818
819impl Deref for Worktree {
820 type Target = Snapshot;
821
822 fn deref(&self) -> &Self::Target {
823 match self {
824 Worktree::Local(worktree) => &worktree.snapshot,
825 Worktree::Remote(worktree) => &worktree.snapshot,
826 }
827 }
828}
829
830pub struct LocalWorktree {
831 snapshot: Snapshot,
832 config: WorktreeConfig,
833 background_snapshot: Arc<Mutex<Snapshot>>,
834 last_scan_state_rx: watch::Receiver<ScanState>,
835 _background_scanner_task: Option<Task<()>>,
836 _maintain_remote_id_task: Task<Option<()>>,
837 poll_task: Option<Task<()>>,
838 remote_id: watch::Receiver<Option<u64>>,
839 share: Option<ShareState>,
840 open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
841 shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
842 diagnostics: HashMap<PathBuf, Vec<lsp::Diagnostic>>,
843 collaborators: HashMap<PeerId, Collaborator>,
844 queued_operations: Vec<(u64, Operation)>,
845 languages: Arc<LanguageRegistry>,
846 client: Arc<Client>,
847 user_store: ModelHandle<UserStore>,
848 fs: Arc<dyn Fs>,
849 language_servers: HashMap<String, Arc<LanguageServer>>,
850}
851
852#[derive(Default, Deserialize)]
853struct WorktreeConfig {
854 collaborators: Vec<String>,
855}
856
857impl LocalWorktree {
858 async fn new(
859 client: Arc<Client>,
860 user_store: ModelHandle<UserStore>,
861 path: impl Into<Arc<Path>>,
862 fs: Arc<dyn Fs>,
863 languages: Arc<LanguageRegistry>,
864 cx: &mut AsyncAppContext,
865 ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
866 let abs_path = path.into();
867 let path: Arc<Path> = Arc::from(Path::new(""));
868 let next_entry_id = AtomicUsize::new(0);
869
870 // After determining whether the root entry is a file or a directory, populate the
871 // snapshot's "root name", which will be used for the purpose of fuzzy matching.
872 let root_name = abs_path
873 .file_name()
874 .map_or(String::new(), |f| f.to_string_lossy().to_string());
875 let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
876 let metadata = fs.metadata(&abs_path).await?;
877
878 let mut config = WorktreeConfig::default();
879 if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
880 if let Ok(parsed) = toml::from_str(&zed_toml) {
881 config = parsed;
882 }
883 }
884
885 let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
886 let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
887 let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
888 let mut snapshot = Snapshot {
889 id: cx.model_id(),
890 scan_id: 0,
891 abs_path,
892 root_name: root_name.clone(),
893 root_char_bag,
894 ignores: Default::default(),
895 entries_by_path: Default::default(),
896 entries_by_id: Default::default(),
897 removed_entry_ids: Default::default(),
898 next_entry_id: Arc::new(next_entry_id),
899 };
900 if let Some(metadata) = metadata {
901 snapshot.insert_entry(
902 Entry::new(
903 path.into(),
904 &metadata,
905 &snapshot.next_entry_id,
906 snapshot.root_char_bag,
907 ),
908 fs.as_ref(),
909 );
910 }
911
912 let (mut remote_id_tx, remote_id_rx) = watch::channel();
913 let _maintain_remote_id_task = cx.spawn_weak({
914 let rpc = client.clone();
915 move |this, cx| {
916 async move {
917 let mut status = rpc.status();
918 while let Some(status) = status.recv().await {
919 if let Some(this) = this.upgrade(&cx) {
920 let remote_id = if let client::Status::Connected { .. } = status {
921 let authorized_logins = this.read_with(&cx, |this, _| {
922 this.as_local().unwrap().config.collaborators.clone()
923 });
924 let response = rpc
925 .request(proto::OpenWorktree {
926 root_name: root_name.clone(),
927 authorized_logins,
928 })
929 .await?;
930
931 Some(response.worktree_id)
932 } else {
933 None
934 };
935 if remote_id_tx.send(remote_id).await.is_err() {
936 break;
937 }
938 }
939 }
940 Ok(())
941 }
942 .log_err()
943 }
944 });
945
946 let tree = Self {
947 snapshot: snapshot.clone(),
948 config,
949 remote_id: remote_id_rx,
950 background_snapshot: Arc::new(Mutex::new(snapshot)),
951 last_scan_state_rx,
952 _background_scanner_task: None,
953 _maintain_remote_id_task,
954 share: None,
955 poll_task: None,
956 open_buffers: Default::default(),
957 shared_buffers: Default::default(),
958 diagnostics: Default::default(),
959 queued_operations: Default::default(),
960 collaborators: Default::default(),
961 languages,
962 client,
963 user_store,
964 fs,
965 language_servers: Default::default(),
966 };
967
968 cx.spawn_weak(|this, mut cx| async move {
969 while let Ok(scan_state) = scan_states_rx.recv().await {
970 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
971 let to_send = handle.update(&mut cx, |this, cx| {
972 last_scan_state_tx.blocking_send(scan_state).ok();
973 this.poll_snapshot(cx);
974 let tree = this.as_local_mut().unwrap();
975 if !tree.is_scanning() {
976 if let Some(share) = tree.share.as_ref() {
977 return Some((tree.snapshot(), share.snapshots_tx.clone()));
978 }
979 }
980 None
981 });
982
983 if let Some((snapshot, snapshots_to_send_tx)) = to_send {
984 if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
985 log::error!("error submitting snapshot to send {}", err);
986 }
987 }
988 } else {
989 break;
990 }
991 }
992 })
993 .detach();
994
995 Worktree::Local(tree)
996 });
997
998 Ok((tree, scan_states_tx))
999 }
1000
1001 pub fn languages(&self) -> &LanguageRegistry {
1002 &self.languages
1003 }
1004
1005 pub fn ensure_language_server(
1006 &mut self,
1007 language: &Language,
1008 cx: &mut ModelContext<Worktree>,
1009 ) -> Option<Arc<LanguageServer>> {
1010 if let Some(server) = self.language_servers.get(language.name()) {
1011 return Some(server.clone());
1012 }
1013
1014 if let Some(language_server) = language
1015 .start_server(self.abs_path(), cx)
1016 .log_err()
1017 .flatten()
1018 {
1019 let (diagnostics_tx, diagnostics_rx) = smol::channel::unbounded();
1020 language_server
1021 .on_notification::<lsp::notification::PublishDiagnostics, _>(move |params| {
1022 smol::block_on(diagnostics_tx.send(params)).ok();
1023 })
1024 .detach();
1025 cx.spawn_weak(|this, mut cx| async move {
1026 while let Ok(diagnostics) = diagnostics_rx.recv().await {
1027 if let Some(handle) = cx.read(|cx| this.upgrade(cx)) {
1028 handle.update(&mut cx, |this, cx| {
1029 this.update_diagnostics(diagnostics, cx).log_err();
1030 });
1031 } else {
1032 break;
1033 }
1034 }
1035 })
1036 .detach();
1037
1038 self.language_servers
1039 .insert(language.name().to_string(), language_server.clone());
1040 Some(language_server.clone())
1041 } else {
1042 None
1043 }
1044 }
1045
1046 pub fn open_buffer(
1047 &mut self,
1048 path: &Path,
1049 cx: &mut ModelContext<Worktree>,
1050 ) -> Task<Result<ModelHandle<Buffer>>> {
1051 let handle = cx.handle();
1052
1053 // If there is already a buffer for the given path, then return it.
1054 let mut existing_buffer = None;
1055 self.open_buffers.retain(|_buffer_id, buffer| {
1056 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1057 if let Some(file) = buffer.read(cx.as_ref()).file() {
1058 if file.worktree_id() == handle.id() && file.path().as_ref() == path {
1059 existing_buffer = Some(buffer);
1060 }
1061 }
1062 true
1063 } else {
1064 false
1065 }
1066 });
1067
1068 let path = Arc::from(path);
1069 cx.spawn(|this, mut cx| async move {
1070 if let Some(existing_buffer) = existing_buffer {
1071 Ok(existing_buffer)
1072 } else {
1073 let (file, contents) = this
1074 .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
1075 .await?;
1076 let language = this.read_with(&cx, |this, _| {
1077 use language::File;
1078 this.languages().select_language(file.full_path()).cloned()
1079 });
1080 let (diagnostics, language_server) = this.update(&mut cx, |this, cx| {
1081 let this = this.as_local_mut().unwrap();
1082 (
1083 this.diagnostics.remove(path.as_ref()),
1084 language
1085 .as_ref()
1086 .and_then(|language| this.ensure_language_server(language, cx)),
1087 )
1088 });
1089 let buffer = cx.add_model(|cx| {
1090 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
1091 buffer.set_language(language, language_server, cx);
1092 if let Some(diagnostics) = diagnostics {
1093 buffer.update_diagnostics(None, diagnostics, cx).unwrap();
1094 }
1095 buffer
1096 });
1097 this.update(&mut cx, |this, _| {
1098 let this = this
1099 .as_local_mut()
1100 .ok_or_else(|| anyhow!("must be a local worktree"))?;
1101
1102 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1103 Ok(buffer)
1104 })
1105 }
1106 })
1107 }
1108
1109 pub fn open_remote_buffer(
1110 &mut self,
1111 envelope: TypedEnvelope<proto::OpenBuffer>,
1112 cx: &mut ModelContext<Worktree>,
1113 ) -> Task<Result<proto::OpenBufferResponse>> {
1114 let peer_id = envelope.original_sender_id();
1115 let path = Path::new(&envelope.payload.path);
1116
1117 let buffer = self.open_buffer(path, cx);
1118
1119 cx.spawn(|this, mut cx| async move {
1120 let buffer = buffer.await?;
1121 this.update(&mut cx, |this, cx| {
1122 this.as_local_mut()
1123 .unwrap()
1124 .shared_buffers
1125 .entry(peer_id?)
1126 .or_default()
1127 .insert(buffer.id() as u64, buffer.clone());
1128
1129 Ok(proto::OpenBufferResponse {
1130 buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
1131 })
1132 })
1133 })
1134 }
1135
1136 pub fn close_remote_buffer(
1137 &mut self,
1138 envelope: TypedEnvelope<proto::CloseBuffer>,
1139 cx: &mut ModelContext<Worktree>,
1140 ) -> Result<()> {
1141 if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
1142 shared_buffers.remove(&envelope.payload.buffer_id);
1143 cx.notify();
1144 }
1145
1146 Ok(())
1147 }
1148
1149 pub fn add_collaborator(
1150 &mut self,
1151 collaborator: Collaborator,
1152 cx: &mut ModelContext<Worktree>,
1153 ) {
1154 self.collaborators
1155 .insert(collaborator.peer_id, collaborator);
1156 cx.notify();
1157 }
1158
1159 pub fn remove_collaborator(
1160 &mut self,
1161 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1162 cx: &mut ModelContext<Worktree>,
1163 ) -> Result<()> {
1164 let peer_id = PeerId(envelope.payload.peer_id);
1165 let replica_id = self
1166 .collaborators
1167 .remove(&peer_id)
1168 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1169 .replica_id;
1170 self.shared_buffers.remove(&peer_id);
1171 for (_, buffer) in &self.open_buffers {
1172 if let Some(buffer) = buffer.upgrade(cx) {
1173 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1174 }
1175 }
1176 cx.notify();
1177
1178 Ok(())
1179 }
1180
1181 pub fn scan_complete(&self) -> impl Future<Output = ()> {
1182 let mut scan_state_rx = self.last_scan_state_rx.clone();
1183 async move {
1184 let mut scan_state = Some(scan_state_rx.borrow().clone());
1185 while let Some(ScanState::Scanning) = scan_state {
1186 scan_state = scan_state_rx.recv().await;
1187 }
1188 }
1189 }
1190
1191 pub fn remote_id(&self) -> Option<u64> {
1192 *self.remote_id.borrow()
1193 }
1194
1195 pub fn next_remote_id(&self) -> impl Future<Output = Option<u64>> {
1196 let mut remote_id = self.remote_id.clone();
1197 async move {
1198 while let Some(remote_id) = remote_id.recv().await {
1199 if remote_id.is_some() {
1200 return remote_id;
1201 }
1202 }
1203 None
1204 }
1205 }
1206
1207 fn is_scanning(&self) -> bool {
1208 if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
1209 true
1210 } else {
1211 false
1212 }
1213 }
1214
1215 pub fn snapshot(&self) -> Snapshot {
1216 self.snapshot.clone()
1217 }
1218
1219 pub fn abs_path(&self) -> &Path {
1220 self.snapshot.abs_path.as_ref()
1221 }
1222
1223 pub fn contains_abs_path(&self, path: &Path) -> bool {
1224 path.starts_with(&self.snapshot.abs_path)
1225 }
1226
1227 fn absolutize(&self, path: &Path) -> PathBuf {
1228 if path.file_name().is_some() {
1229 self.snapshot.abs_path.join(path)
1230 } else {
1231 self.snapshot.abs_path.to_path_buf()
1232 }
1233 }
1234
1235 fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
1236 let handle = cx.handle();
1237 let path = Arc::from(path);
1238 let worktree_path = self.abs_path.clone();
1239 let abs_path = self.absolutize(&path);
1240 let background_snapshot = self.background_snapshot.clone();
1241 let fs = self.fs.clone();
1242 cx.spawn(|this, mut cx| async move {
1243 let text = fs.load(&abs_path).await?;
1244 // Eagerly populate the snapshot with an updated entry for the loaded file
1245 let entry = refresh_entry(fs.as_ref(), &background_snapshot, path, &abs_path).await?;
1246 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1247 Ok((
1248 File {
1249 entry_id: Some(entry.id),
1250 worktree: handle,
1251 worktree_path,
1252 path: entry.path,
1253 mtime: entry.mtime,
1254 is_local: true,
1255 },
1256 text,
1257 ))
1258 })
1259 }
1260
1261 pub fn save_buffer_as(
1262 &self,
1263 buffer: ModelHandle<Buffer>,
1264 path: impl Into<Arc<Path>>,
1265 text: Rope,
1266 cx: &mut ModelContext<Worktree>,
1267 ) -> Task<Result<File>> {
1268 let save = self.save(path, text, cx);
1269 cx.spawn(|this, mut cx| async move {
1270 let entry = save.await?;
1271 this.update(&mut cx, |this, cx| {
1272 let this = this.as_local_mut().unwrap();
1273 this.open_buffers.insert(buffer.id(), buffer.downgrade());
1274 Ok(File {
1275 entry_id: Some(entry.id),
1276 worktree: cx.handle(),
1277 worktree_path: this.abs_path.clone(),
1278 path: entry.path,
1279 mtime: entry.mtime,
1280 is_local: true,
1281 })
1282 })
1283 })
1284 }
1285
1286 fn save(
1287 &self,
1288 path: impl Into<Arc<Path>>,
1289 text: Rope,
1290 cx: &mut ModelContext<Worktree>,
1291 ) -> Task<Result<Entry>> {
1292 let path = path.into();
1293 let abs_path = self.absolutize(&path);
1294 let background_snapshot = self.background_snapshot.clone();
1295 let fs = self.fs.clone();
1296 let save = cx.background().spawn(async move {
1297 fs.save(&abs_path, &text).await?;
1298 refresh_entry(fs.as_ref(), &background_snapshot, path.clone(), &abs_path).await
1299 });
1300
1301 cx.spawn(|this, mut cx| async move {
1302 let entry = save.await?;
1303 this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
1304 Ok(entry)
1305 })
1306 }
1307
1308 pub fn share(&mut self, cx: &mut ModelContext<Worktree>) -> Task<anyhow::Result<u64>> {
1309 let snapshot = self.snapshot();
1310 let share_request = self.share_request(cx);
1311 let rpc = self.client.clone();
1312 cx.spawn(|this, mut cx| async move {
1313 let share_request = if let Some(request) = share_request.await {
1314 request
1315 } else {
1316 return Err(anyhow!("failed to open worktree on the server"));
1317 };
1318
1319 let remote_id = share_request.worktree.as_ref().unwrap().id;
1320 let share_response = rpc.request(share_request).await?;
1321
1322 log::info!("sharing worktree {:?}", share_response);
1323 let (snapshots_to_send_tx, snapshots_to_send_rx) =
1324 smol::channel::unbounded::<Snapshot>();
1325
1326 cx.background()
1327 .spawn({
1328 let rpc = rpc.clone();
1329 async move {
1330 let mut prev_snapshot = snapshot;
1331 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
1332 let message = snapshot.build_update(&prev_snapshot, remote_id, false);
1333 match rpc.send(message).await {
1334 Ok(()) => prev_snapshot = snapshot,
1335 Err(err) => log::error!("error sending snapshot diff {}", err),
1336 }
1337 }
1338 }
1339 })
1340 .detach();
1341
1342 this.update(&mut cx, |worktree, cx| {
1343 let _subscriptions = vec![
1344 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator),
1345 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator),
1346 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer),
1347 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer),
1348 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer),
1349 rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer),
1350 ];
1351
1352 let worktree = worktree.as_local_mut().unwrap();
1353 worktree.share = Some(ShareState {
1354 snapshots_tx: snapshots_to_send_tx,
1355 _subscriptions,
1356 });
1357 });
1358
1359 Ok(remote_id)
1360 })
1361 }
1362
1363 pub fn unshare(&mut self, cx: &mut ModelContext<Worktree>) {
1364 self.share.take();
1365 let rpc = self.client.clone();
1366 let remote_id = self.remote_id();
1367 cx.foreground()
1368 .spawn(
1369 async move {
1370 if let Some(worktree_id) = remote_id {
1371 rpc.send(proto::UnshareWorktree { worktree_id }).await?;
1372 }
1373 Ok(())
1374 }
1375 .log_err(),
1376 )
1377 .detach()
1378 }
1379
1380 fn share_request(&self, cx: &mut ModelContext<Worktree>) -> Task<Option<proto::ShareWorktree>> {
1381 let remote_id = self.next_remote_id();
1382 let snapshot = self.snapshot();
1383 let root_name = self.root_name.clone();
1384 cx.background().spawn(async move {
1385 remote_id.await.map(|id| {
1386 let entries = snapshot
1387 .entries_by_path
1388 .cursor::<()>()
1389 .filter(|e| !e.is_ignored)
1390 .map(Into::into)
1391 .collect();
1392 proto::ShareWorktree {
1393 worktree: Some(proto::Worktree {
1394 id,
1395 root_name,
1396 entries,
1397 }),
1398 }
1399 })
1400 })
1401 }
1402}
1403
1404fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
1405 let contents = smol::block_on(fs.load(&abs_path))?;
1406 let parent = abs_path.parent().unwrap_or(Path::new("/"));
1407 let mut builder = GitignoreBuilder::new(parent);
1408 for line in contents.lines() {
1409 builder.add_line(Some(abs_path.into()), line)?;
1410 }
1411 Ok(builder.build()?)
1412}
1413
1414impl Deref for LocalWorktree {
1415 type Target = Snapshot;
1416
1417 fn deref(&self) -> &Self::Target {
1418 &self.snapshot
1419 }
1420}
1421
1422impl fmt::Debug for LocalWorktree {
1423 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1424 self.snapshot.fmt(f)
1425 }
1426}
1427
1428struct ShareState {
1429 snapshots_tx: Sender<Snapshot>,
1430 _subscriptions: Vec<client::Subscription>,
1431}
1432
1433pub struct RemoteWorktree {
1434 remote_id: u64,
1435 snapshot: Snapshot,
1436 snapshot_rx: watch::Receiver<Snapshot>,
1437 client: Arc<Client>,
1438 updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
1439 replica_id: ReplicaId,
1440 open_buffers: HashMap<usize, RemoteBuffer>,
1441 collaborators: HashMap<PeerId, Collaborator>,
1442 languages: Arc<LanguageRegistry>,
1443 user_store: ModelHandle<UserStore>,
1444 queued_operations: Vec<(u64, Operation)>,
1445 _subscriptions: Vec<client::Subscription>,
1446}
1447
1448impl RemoteWorktree {
1449 pub fn open_buffer(
1450 &mut self,
1451 path: &Path,
1452 cx: &mut ModelContext<Worktree>,
1453 ) -> Task<Result<ModelHandle<Buffer>>> {
1454 let mut existing_buffer = None;
1455 self.open_buffers.retain(|_buffer_id, buffer| {
1456 if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
1457 if let Some(file) = buffer.read(cx.as_ref()).file() {
1458 if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
1459 existing_buffer = Some(buffer);
1460 }
1461 }
1462 true
1463 } else {
1464 false
1465 }
1466 });
1467
1468 let rpc = self.client.clone();
1469 let replica_id = self.replica_id;
1470 let remote_worktree_id = self.remote_id;
1471 let root_path = self.snapshot.abs_path.clone();
1472 let path = path.to_string_lossy().to_string();
1473 cx.spawn_weak(|this, mut cx| async move {
1474 if let Some(existing_buffer) = existing_buffer {
1475 Ok(existing_buffer)
1476 } else {
1477 let entry = this
1478 .upgrade(&cx)
1479 .ok_or_else(|| anyhow!("worktree was closed"))?
1480 .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
1481 .ok_or_else(|| anyhow!("file does not exist"))?;
1482 let response = rpc
1483 .request(proto::OpenBuffer {
1484 worktree_id: remote_worktree_id as u64,
1485 path,
1486 })
1487 .await?;
1488
1489 let this = this
1490 .upgrade(&cx)
1491 .ok_or_else(|| anyhow!("worktree was closed"))?;
1492 let file = File {
1493 entry_id: Some(entry.id),
1494 worktree: this.clone(),
1495 worktree_path: root_path,
1496 path: entry.path,
1497 mtime: entry.mtime,
1498 is_local: false,
1499 };
1500 let language = this.read_with(&cx, |this, _| {
1501 use language::File;
1502 this.languages().select_language(file.full_path()).cloned()
1503 });
1504 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
1505 let buffer_id = remote_buffer.id as usize;
1506 let buffer = cx.add_model(|cx| {
1507 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
1508 .unwrap()
1509 .with_language(language, None, cx)
1510 });
1511 this.update(&mut cx, |this, cx| {
1512 let this = this.as_remote_mut().unwrap();
1513 if let Some(RemoteBuffer::Operations(pending_ops)) = this
1514 .open_buffers
1515 .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
1516 {
1517 buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
1518 }
1519 Result::<_, anyhow::Error>::Ok(())
1520 })?;
1521 Ok(buffer)
1522 }
1523 })
1524 }
1525
1526 pub fn remote_id(&self) -> u64 {
1527 self.remote_id
1528 }
1529
1530 pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) {
1531 for (_, buffer) in self.open_buffers.drain() {
1532 if let RemoteBuffer::Loaded(buffer) = buffer {
1533 if let Some(buffer) = buffer.upgrade(cx) {
1534 buffer.update(cx, |buffer, cx| buffer.close(cx))
1535 }
1536 }
1537 }
1538 }
1539
1540 fn snapshot(&self) -> Snapshot {
1541 self.snapshot.clone()
1542 }
1543
1544 fn update_from_remote(
1545 &mut self,
1546 envelope: TypedEnvelope<proto::UpdateWorktree>,
1547 cx: &mut ModelContext<Worktree>,
1548 ) -> Result<()> {
1549 let mut tx = self.updates_tx.clone();
1550 let payload = envelope.payload.clone();
1551 cx.background()
1552 .spawn(async move {
1553 tx.send(payload).await.expect("receiver runs to completion");
1554 })
1555 .detach();
1556
1557 Ok(())
1558 }
1559
1560 pub fn add_collaborator(
1561 &mut self,
1562 collaborator: Collaborator,
1563 cx: &mut ModelContext<Worktree>,
1564 ) {
1565 self.collaborators
1566 .insert(collaborator.peer_id, collaborator);
1567 cx.notify();
1568 }
1569
1570 pub fn remove_collaborator(
1571 &mut self,
1572 envelope: TypedEnvelope<proto::RemoveCollaborator>,
1573 cx: &mut ModelContext<Worktree>,
1574 ) -> Result<()> {
1575 let peer_id = PeerId(envelope.payload.peer_id);
1576 let replica_id = self
1577 .collaborators
1578 .remove(&peer_id)
1579 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
1580 .replica_id;
1581 for (_, buffer) in &self.open_buffers {
1582 if let Some(buffer) = buffer.upgrade(cx) {
1583 buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
1584 }
1585 }
1586 cx.notify();
1587 Ok(())
1588 }
1589}
1590
1591enum RemoteBuffer {
1592 Operations(Vec<Operation>),
1593 Loaded(WeakModelHandle<Buffer>),
1594}
1595
1596impl RemoteBuffer {
1597 fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
1598 match self {
1599 Self::Operations(_) => None,
1600 Self::Loaded(buffer) => buffer.upgrade(cx),
1601 }
1602 }
1603}
1604
1605#[derive(Clone)]
1606pub struct Snapshot {
1607 id: usize,
1608 scan_id: usize,
1609 abs_path: Arc<Path>,
1610 root_name: String,
1611 root_char_bag: CharBag,
1612 ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
1613 entries_by_path: SumTree<Entry>,
1614 entries_by_id: SumTree<PathEntry>,
1615 removed_entry_ids: HashMap<u64, usize>,
1616 next_entry_id: Arc<AtomicUsize>,
1617}
1618
1619impl Snapshot {
1620 pub fn id(&self) -> usize {
1621 self.id
1622 }
1623
1624 pub fn build_update(
1625 &self,
1626 other: &Self,
1627 worktree_id: u64,
1628 include_ignored: bool,
1629 ) -> proto::UpdateWorktree {
1630 let mut updated_entries = Vec::new();
1631 let mut removed_entries = Vec::new();
1632 let mut self_entries = self
1633 .entries_by_id
1634 .cursor::<()>()
1635 .filter(|e| include_ignored || !e.is_ignored)
1636 .peekable();
1637 let mut other_entries = other
1638 .entries_by_id
1639 .cursor::<()>()
1640 .filter(|e| include_ignored || !e.is_ignored)
1641 .peekable();
1642 loop {
1643 match (self_entries.peek(), other_entries.peek()) {
1644 (Some(self_entry), Some(other_entry)) => {
1645 match Ord::cmp(&self_entry.id, &other_entry.id) {
1646 Ordering::Less => {
1647 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1648 updated_entries.push(entry);
1649 self_entries.next();
1650 }
1651 Ordering::Equal => {
1652 if self_entry.scan_id != other_entry.scan_id {
1653 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1654 updated_entries.push(entry);
1655 }
1656
1657 self_entries.next();
1658 other_entries.next();
1659 }
1660 Ordering::Greater => {
1661 removed_entries.push(other_entry.id as u64);
1662 other_entries.next();
1663 }
1664 }
1665 }
1666 (Some(self_entry), None) => {
1667 let entry = self.entry_for_id(self_entry.id).unwrap().into();
1668 updated_entries.push(entry);
1669 self_entries.next();
1670 }
1671 (None, Some(other_entry)) => {
1672 removed_entries.push(other_entry.id as u64);
1673 other_entries.next();
1674 }
1675 (None, None) => break,
1676 }
1677 }
1678
1679 proto::UpdateWorktree {
1680 updated_entries,
1681 removed_entries,
1682 worktree_id,
1683 }
1684 }
1685
1686 fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
1687 self.scan_id += 1;
1688 let scan_id = self.scan_id;
1689
1690 let mut entries_by_path_edits = Vec::new();
1691 let mut entries_by_id_edits = Vec::new();
1692 for entry_id in update.removed_entries {
1693 let entry_id = entry_id as usize;
1694 let entry = self
1695 .entry_for_id(entry_id)
1696 .ok_or_else(|| anyhow!("unknown entry"))?;
1697 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
1698 entries_by_id_edits.push(Edit::Remove(entry.id));
1699 }
1700
1701 for entry in update.updated_entries {
1702 let entry = Entry::try_from((&self.root_char_bag, entry))?;
1703 if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
1704 entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
1705 }
1706 entries_by_id_edits.push(Edit::Insert(PathEntry {
1707 id: entry.id,
1708 path: entry.path.clone(),
1709 is_ignored: entry.is_ignored,
1710 scan_id,
1711 }));
1712 entries_by_path_edits.push(Edit::Insert(entry));
1713 }
1714
1715 self.entries_by_path.edit(entries_by_path_edits, &());
1716 self.entries_by_id.edit(entries_by_id_edits, &());
1717
1718 Ok(())
1719 }
1720
1721 pub fn file_count(&self) -> usize {
1722 self.entries_by_path.summary().file_count
1723 }
1724
1725 pub fn visible_file_count(&self) -> usize {
1726 self.entries_by_path.summary().visible_file_count
1727 }
1728
1729 fn traverse_from_offset(
1730 &self,
1731 include_dirs: bool,
1732 include_ignored: bool,
1733 start_offset: usize,
1734 ) -> Traversal {
1735 let mut cursor = self.entries_by_path.cursor();
1736 cursor.seek(
1737 &TraversalTarget::Count {
1738 count: start_offset,
1739 include_dirs,
1740 include_ignored,
1741 },
1742 Bias::Right,
1743 &(),
1744 );
1745 Traversal {
1746 cursor,
1747 include_dirs,
1748 include_ignored,
1749 }
1750 }
1751
1752 fn traverse_from_path(
1753 &self,
1754 include_dirs: bool,
1755 include_ignored: bool,
1756 path: &Path,
1757 ) -> Traversal {
1758 let mut cursor = self.entries_by_path.cursor();
1759 cursor.seek(&TraversalTarget::Path(path), Bias::Left, &());
1760 Traversal {
1761 cursor,
1762 include_dirs,
1763 include_ignored,
1764 }
1765 }
1766
1767 pub fn files(&self, include_ignored: bool, start: usize) -> Traversal {
1768 self.traverse_from_offset(false, include_ignored, start)
1769 }
1770
1771 pub fn entries(&self, include_ignored: bool) -> Traversal {
1772 self.traverse_from_offset(true, include_ignored, 0)
1773 }
1774
1775 pub fn paths(&self) -> impl Iterator<Item = &Arc<Path>> {
1776 let empty_path = Path::new("");
1777 self.entries_by_path
1778 .cursor::<()>()
1779 .filter(move |entry| entry.path.as_ref() != empty_path)
1780 .map(|entry| &entry.path)
1781 }
1782
1783 fn child_entries<'a>(&'a self, parent_path: &'a Path) -> ChildEntriesIter<'a> {
1784 let mut cursor = self.entries_by_path.cursor();
1785 cursor.seek(&TraversalTarget::Path(parent_path), Bias::Right, &());
1786 let traversal = Traversal {
1787 cursor,
1788 include_dirs: true,
1789 include_ignored: true,
1790 };
1791 ChildEntriesIter {
1792 traversal,
1793 parent_path,
1794 }
1795 }
1796
1797 pub fn root_entry(&self) -> Option<&Entry> {
1798 self.entry_for_path("")
1799 }
1800
1801 pub fn root_name(&self) -> &str {
1802 &self.root_name
1803 }
1804
1805 pub fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
1806 let path = path.as_ref();
1807 self.traverse_from_path(true, true, path)
1808 .entry()
1809 .and_then(|entry| {
1810 if entry.path.as_ref() == path {
1811 Some(entry)
1812 } else {
1813 None
1814 }
1815 })
1816 }
1817
1818 pub fn entry_for_id(&self, id: usize) -> Option<&Entry> {
1819 let entry = self.entries_by_id.get(&id, &())?;
1820 self.entry_for_path(&entry.path)
1821 }
1822
1823 pub fn inode_for_path(&self, path: impl AsRef<Path>) -> Option<u64> {
1824 self.entry_for_path(path.as_ref()).map(|e| e.inode)
1825 }
1826
1827 fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
1828 if !entry.is_dir() && entry.path.file_name() == Some(&GITIGNORE) {
1829 let abs_path = self.abs_path.join(&entry.path);
1830 match build_gitignore(&abs_path, fs) {
1831 Ok(ignore) => {
1832 let ignore_dir_path = entry.path.parent().unwrap();
1833 self.ignores
1834 .insert(ignore_dir_path.into(), (Arc::new(ignore), self.scan_id));
1835 }
1836 Err(error) => {
1837 log::error!(
1838 "error loading .gitignore file {:?} - {:?}",
1839 &entry.path,
1840 error
1841 );
1842 }
1843 }
1844 }
1845
1846 self.reuse_entry_id(&mut entry);
1847 self.entries_by_path.insert_or_replace(entry.clone(), &());
1848 self.entries_by_id.insert_or_replace(
1849 PathEntry {
1850 id: entry.id,
1851 path: entry.path.clone(),
1852 is_ignored: entry.is_ignored,
1853 scan_id: self.scan_id,
1854 },
1855 &(),
1856 );
1857 entry
1858 }
1859
1860 fn populate_dir(
1861 &mut self,
1862 parent_path: Arc<Path>,
1863 entries: impl IntoIterator<Item = Entry>,
1864 ignore: Option<Arc<Gitignore>>,
1865 ) {
1866 let mut parent_entry = self
1867 .entries_by_path
1868 .get(&PathKey(parent_path.clone()), &())
1869 .unwrap()
1870 .clone();
1871 if let Some(ignore) = ignore {
1872 self.ignores.insert(parent_path, (ignore, self.scan_id));
1873 }
1874 if matches!(parent_entry.kind, EntryKind::PendingDir) {
1875 parent_entry.kind = EntryKind::Dir;
1876 } else {
1877 unreachable!();
1878 }
1879
1880 let mut entries_by_path_edits = vec![Edit::Insert(parent_entry)];
1881 let mut entries_by_id_edits = Vec::new();
1882
1883 for mut entry in entries {
1884 self.reuse_entry_id(&mut entry);
1885 entries_by_id_edits.push(Edit::Insert(PathEntry {
1886 id: entry.id,
1887 path: entry.path.clone(),
1888 is_ignored: entry.is_ignored,
1889 scan_id: self.scan_id,
1890 }));
1891 entries_by_path_edits.push(Edit::Insert(entry));
1892 }
1893
1894 self.entries_by_path.edit(entries_by_path_edits, &());
1895 self.entries_by_id.edit(entries_by_id_edits, &());
1896 }
1897
1898 fn reuse_entry_id(&mut self, entry: &mut Entry) {
1899 if let Some(removed_entry_id) = self.removed_entry_ids.remove(&entry.inode) {
1900 entry.id = removed_entry_id;
1901 } else if let Some(existing_entry) = self.entry_for_path(&entry.path) {
1902 entry.id = existing_entry.id;
1903 }
1904 }
1905
1906 fn remove_path(&mut self, path: &Path) {
1907 let mut new_entries;
1908 let removed_entries;
1909 {
1910 let mut cursor = self.entries_by_path.cursor::<TraversalProgress>();
1911 new_entries = cursor.slice(&TraversalTarget::Path(path), Bias::Left, &());
1912 removed_entries = cursor.slice(&TraversalTarget::PathSuccessor(path), Bias::Left, &());
1913 new_entries.push_tree(cursor.suffix(&()), &());
1914 }
1915 self.entries_by_path = new_entries;
1916
1917 let mut entries_by_id_edits = Vec::new();
1918 for entry in removed_entries.cursor::<()>() {
1919 let removed_entry_id = self
1920 .removed_entry_ids
1921 .entry(entry.inode)
1922 .or_insert(entry.id);
1923 *removed_entry_id = cmp::max(*removed_entry_id, entry.id);
1924 entries_by_id_edits.push(Edit::Remove(entry.id));
1925 }
1926 self.entries_by_id.edit(entries_by_id_edits, &());
1927
1928 if path.file_name() == Some(&GITIGNORE) {
1929 if let Some((_, scan_id)) = self.ignores.get_mut(path.parent().unwrap()) {
1930 *scan_id = self.scan_id;
1931 }
1932 }
1933 }
1934
1935 fn ignore_stack_for_path(&self, path: &Path, is_dir: bool) -> Arc<IgnoreStack> {
1936 let mut new_ignores = Vec::new();
1937 for ancestor in path.ancestors().skip(1) {
1938 if let Some((ignore, _)) = self.ignores.get(ancestor) {
1939 new_ignores.push((ancestor, Some(ignore.clone())));
1940 } else {
1941 new_ignores.push((ancestor, None));
1942 }
1943 }
1944
1945 let mut ignore_stack = IgnoreStack::none();
1946 for (parent_path, ignore) in new_ignores.into_iter().rev() {
1947 if ignore_stack.is_path_ignored(&parent_path, true) {
1948 ignore_stack = IgnoreStack::all();
1949 break;
1950 } else if let Some(ignore) = ignore {
1951 ignore_stack = ignore_stack.append(Arc::from(parent_path), ignore);
1952 }
1953 }
1954
1955 if ignore_stack.is_path_ignored(path, is_dir) {
1956 ignore_stack = IgnoreStack::all();
1957 }
1958
1959 ignore_stack
1960 }
1961}
1962
1963impl fmt::Debug for Snapshot {
1964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1965 for entry in self.entries_by_path.cursor::<()>() {
1966 for _ in entry.path.ancestors().skip(1) {
1967 write!(f, " ")?;
1968 }
1969 writeln!(f, "{:?} (inode: {})", entry.path, entry.inode)?;
1970 }
1971 Ok(())
1972 }
1973}
1974
1975#[derive(Clone, PartialEq)]
1976pub struct File {
1977 entry_id: Option<usize>,
1978 worktree: ModelHandle<Worktree>,
1979 worktree_path: Arc<Path>,
1980 pub path: Arc<Path>,
1981 pub mtime: SystemTime,
1982 is_local: bool,
1983}
1984
1985impl language::File for File {
1986 fn worktree_id(&self) -> usize {
1987 self.worktree.id()
1988 }
1989
1990 fn entry_id(&self) -> Option<usize> {
1991 self.entry_id
1992 }
1993
1994 fn mtime(&self) -> SystemTime {
1995 self.mtime
1996 }
1997
1998 fn path(&self) -> &Arc<Path> {
1999 &self.path
2000 }
2001
2002 fn abs_path(&self) -> Option<PathBuf> {
2003 if self.is_local {
2004 Some(self.worktree_path.join(&self.path))
2005 } else {
2006 None
2007 }
2008 }
2009
2010 fn full_path(&self) -> PathBuf {
2011 let mut full_path = PathBuf::new();
2012 if let Some(worktree_name) = self.worktree_path.file_name() {
2013 full_path.push(worktree_name);
2014 }
2015 full_path.push(&self.path);
2016 full_path
2017 }
2018
2019 /// Returns the last component of this handle's absolute path. If this handle refers to the root
2020 /// of its worktree, then this method will return the name of the worktree itself.
2021 fn file_name<'a>(&'a self) -> Option<OsString> {
2022 self.path
2023 .file_name()
2024 .or_else(|| self.worktree_path.file_name())
2025 .map(Into::into)
2026 }
2027
2028 fn is_deleted(&self) -> bool {
2029 self.entry_id.is_none()
2030 }
2031
2032 fn save(
2033 &self,
2034 buffer_id: u64,
2035 text: Rope,
2036 version: clock::Global,
2037 cx: &mut MutableAppContext,
2038 ) -> Task<Result<(clock::Global, SystemTime)>> {
2039 self.worktree.update(cx, |worktree, cx| match worktree {
2040 Worktree::Local(worktree) => {
2041 let rpc = worktree.client.clone();
2042 let worktree_id = *worktree.remote_id.borrow();
2043 let save = worktree.save(self.path.clone(), text, cx);
2044 cx.background().spawn(async move {
2045 let entry = save.await?;
2046 if let Some(worktree_id) = worktree_id {
2047 rpc.send(proto::BufferSaved {
2048 worktree_id,
2049 buffer_id,
2050 version: (&version).into(),
2051 mtime: Some(entry.mtime.into()),
2052 })
2053 .await?;
2054 }
2055 Ok((version, entry.mtime))
2056 })
2057 }
2058 Worktree::Remote(worktree) => {
2059 let rpc = worktree.client.clone();
2060 let worktree_id = worktree.remote_id;
2061 cx.foreground().spawn(async move {
2062 let response = rpc
2063 .request(proto::SaveBuffer {
2064 worktree_id,
2065 buffer_id,
2066 })
2067 .await?;
2068 let version = response.version.try_into()?;
2069 let mtime = response
2070 .mtime
2071 .ok_or_else(|| anyhow!("missing mtime"))?
2072 .into();
2073 Ok((version, mtime))
2074 })
2075 }
2076 })
2077 }
2078
2079 fn load_local(&self, cx: &AppContext) -> Option<Task<Result<String>>> {
2080 let worktree = self.worktree.read(cx).as_local()?;
2081 let abs_path = worktree.absolutize(&self.path);
2082 let fs = worktree.fs.clone();
2083 Some(
2084 cx.background()
2085 .spawn(async move { fs.load(&abs_path).await }),
2086 )
2087 }
2088
2089 fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
2090 self.worktree.update(cx, |worktree, cx| {
2091 worktree.send_buffer_update(buffer_id, operation, cx);
2092 });
2093 }
2094
2095 fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
2096 self.worktree.update(cx, |worktree, cx| {
2097 if let Worktree::Remote(worktree) = worktree {
2098 let worktree_id = worktree.remote_id;
2099 let rpc = worktree.client.clone();
2100 cx.background()
2101 .spawn(async move {
2102 if let Err(error) = rpc
2103 .send(proto::CloseBuffer {
2104 worktree_id,
2105 buffer_id,
2106 })
2107 .await
2108 {
2109 log::error!("error closing remote buffer: {}", error);
2110 }
2111 })
2112 .detach();
2113 }
2114 });
2115 }
2116
2117 fn boxed_clone(&self) -> Box<dyn language::File> {
2118 Box::new(self.clone())
2119 }
2120
2121 fn as_any(&self) -> &dyn Any {
2122 self
2123 }
2124}
2125
2126#[derive(Clone, Debug)]
2127pub struct Entry {
2128 pub id: usize,
2129 pub kind: EntryKind,
2130 pub path: Arc<Path>,
2131 pub inode: u64,
2132 pub mtime: SystemTime,
2133 pub is_symlink: bool,
2134 pub is_ignored: bool,
2135}
2136
2137#[derive(Clone, Debug)]
2138pub enum EntryKind {
2139 PendingDir,
2140 Dir,
2141 File(CharBag),
2142}
2143
2144impl Entry {
2145 fn new(
2146 path: Arc<Path>,
2147 metadata: &fs::Metadata,
2148 next_entry_id: &AtomicUsize,
2149 root_char_bag: CharBag,
2150 ) -> Self {
2151 Self {
2152 id: next_entry_id.fetch_add(1, SeqCst),
2153 kind: if metadata.is_dir {
2154 EntryKind::PendingDir
2155 } else {
2156 EntryKind::File(char_bag_for_path(root_char_bag, &path))
2157 },
2158 path,
2159 inode: metadata.inode,
2160 mtime: metadata.mtime,
2161 is_symlink: metadata.is_symlink,
2162 is_ignored: false,
2163 }
2164 }
2165
2166 pub fn is_dir(&self) -> bool {
2167 matches!(self.kind, EntryKind::Dir | EntryKind::PendingDir)
2168 }
2169
2170 pub fn is_file(&self) -> bool {
2171 matches!(self.kind, EntryKind::File(_))
2172 }
2173}
2174
2175impl sum_tree::Item for Entry {
2176 type Summary = EntrySummary;
2177
2178 fn summary(&self) -> Self::Summary {
2179 let visible_count = if self.is_ignored { 0 } else { 1 };
2180 let file_count;
2181 let visible_file_count;
2182 if self.is_file() {
2183 file_count = 1;
2184 visible_file_count = visible_count;
2185 } else {
2186 file_count = 0;
2187 visible_file_count = 0;
2188 }
2189
2190 EntrySummary {
2191 max_path: self.path.clone(),
2192 count: 1,
2193 visible_count,
2194 file_count,
2195 visible_file_count,
2196 }
2197 }
2198}
2199
2200impl sum_tree::KeyedItem for Entry {
2201 type Key = PathKey;
2202
2203 fn key(&self) -> Self::Key {
2204 PathKey(self.path.clone())
2205 }
2206}
2207
2208#[derive(Clone, Debug)]
2209pub struct EntrySummary {
2210 max_path: Arc<Path>,
2211 count: usize,
2212 visible_count: usize,
2213 file_count: usize,
2214 visible_file_count: usize,
2215}
2216
2217impl Default for EntrySummary {
2218 fn default() -> Self {
2219 Self {
2220 max_path: Arc::from(Path::new("")),
2221 count: 0,
2222 visible_count: 0,
2223 file_count: 0,
2224 visible_file_count: 0,
2225 }
2226 }
2227}
2228
2229impl sum_tree::Summary for EntrySummary {
2230 type Context = ();
2231
2232 fn add_summary(&mut self, rhs: &Self, _: &()) {
2233 self.max_path = rhs.max_path.clone();
2234 self.visible_count += rhs.visible_count;
2235 self.file_count += rhs.file_count;
2236 self.visible_file_count += rhs.visible_file_count;
2237 }
2238}
2239
2240#[derive(Clone, Debug)]
2241struct PathEntry {
2242 id: usize,
2243 path: Arc<Path>,
2244 is_ignored: bool,
2245 scan_id: usize,
2246}
2247
2248impl sum_tree::Item for PathEntry {
2249 type Summary = PathEntrySummary;
2250
2251 fn summary(&self) -> Self::Summary {
2252 PathEntrySummary { max_id: self.id }
2253 }
2254}
2255
2256impl sum_tree::KeyedItem for PathEntry {
2257 type Key = usize;
2258
2259 fn key(&self) -> Self::Key {
2260 self.id
2261 }
2262}
2263
2264#[derive(Clone, Debug, Default)]
2265struct PathEntrySummary {
2266 max_id: usize,
2267}
2268
2269impl sum_tree::Summary for PathEntrySummary {
2270 type Context = ();
2271
2272 fn add_summary(&mut self, summary: &Self, _: &Self::Context) {
2273 self.max_id = summary.max_id;
2274 }
2275}
2276
2277impl<'a> sum_tree::Dimension<'a, PathEntrySummary> for usize {
2278 fn add_summary(&mut self, summary: &'a PathEntrySummary, _: &()) {
2279 *self = summary.max_id;
2280 }
2281}
2282
2283#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
2284pub struct PathKey(Arc<Path>);
2285
2286impl Default for PathKey {
2287 fn default() -> Self {
2288 Self(Path::new("").into())
2289 }
2290}
2291
2292impl<'a> sum_tree::Dimension<'a, EntrySummary> for PathKey {
2293 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2294 self.0 = summary.max_path.clone();
2295 }
2296}
2297
2298struct BackgroundScanner {
2299 fs: Arc<dyn Fs>,
2300 snapshot: Arc<Mutex<Snapshot>>,
2301 notify: Sender<ScanState>,
2302 executor: Arc<executor::Background>,
2303}
2304
2305impl BackgroundScanner {
2306 fn new(
2307 snapshot: Arc<Mutex<Snapshot>>,
2308 notify: Sender<ScanState>,
2309 fs: Arc<dyn Fs>,
2310 executor: Arc<executor::Background>,
2311 ) -> Self {
2312 Self {
2313 fs,
2314 snapshot,
2315 notify,
2316 executor,
2317 }
2318 }
2319
2320 fn abs_path(&self) -> Arc<Path> {
2321 self.snapshot.lock().abs_path.clone()
2322 }
2323
2324 fn snapshot(&self) -> Snapshot {
2325 self.snapshot.lock().clone()
2326 }
2327
2328 async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
2329 if self.notify.send(ScanState::Scanning).await.is_err() {
2330 return;
2331 }
2332
2333 if let Err(err) = self.scan_dirs().await {
2334 if self
2335 .notify
2336 .send(ScanState::Err(Arc::new(err)))
2337 .await
2338 .is_err()
2339 {
2340 return;
2341 }
2342 }
2343
2344 if self.notify.send(ScanState::Idle).await.is_err() {
2345 return;
2346 }
2347
2348 futures::pin_mut!(events_rx);
2349 while let Some(events) = events_rx.next().await {
2350 if self.notify.send(ScanState::Scanning).await.is_err() {
2351 break;
2352 }
2353
2354 if !self.process_events(events).await {
2355 break;
2356 }
2357
2358 if self.notify.send(ScanState::Idle).await.is_err() {
2359 break;
2360 }
2361 }
2362 }
2363
2364 async fn scan_dirs(&mut self) -> Result<()> {
2365 let root_char_bag;
2366 let next_entry_id;
2367 let is_dir;
2368 {
2369 let snapshot = self.snapshot.lock();
2370 root_char_bag = snapshot.root_char_bag;
2371 next_entry_id = snapshot.next_entry_id.clone();
2372 is_dir = snapshot.root_entry().map_or(false, |e| e.is_dir())
2373 };
2374
2375 if is_dir {
2376 let path: Arc<Path> = Arc::from(Path::new(""));
2377 let abs_path = self.abs_path();
2378 let (tx, rx) = channel::unbounded();
2379 tx.send(ScanJob {
2380 abs_path: abs_path.to_path_buf(),
2381 path,
2382 ignore_stack: IgnoreStack::none(),
2383 scan_queue: tx.clone(),
2384 })
2385 .await
2386 .unwrap();
2387 drop(tx);
2388
2389 self.executor
2390 .scoped(|scope| {
2391 for _ in 0..self.executor.num_cpus() {
2392 scope.spawn(async {
2393 while let Ok(job) = rx.recv().await {
2394 if let Err(err) = self
2395 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2396 .await
2397 {
2398 log::error!("error scanning {:?}: {}", job.abs_path, err);
2399 }
2400 }
2401 });
2402 }
2403 })
2404 .await;
2405 }
2406
2407 Ok(())
2408 }
2409
2410 async fn scan_dir(
2411 &self,
2412 root_char_bag: CharBag,
2413 next_entry_id: Arc<AtomicUsize>,
2414 job: &ScanJob,
2415 ) -> Result<()> {
2416 let mut new_entries: Vec<Entry> = Vec::new();
2417 let mut new_jobs: Vec<ScanJob> = Vec::new();
2418 let mut ignore_stack = job.ignore_stack.clone();
2419 let mut new_ignore = None;
2420
2421 let mut child_paths = self.fs.read_dir(&job.abs_path).await?;
2422 while let Some(child_abs_path) = child_paths.next().await {
2423 let child_abs_path = match child_abs_path {
2424 Ok(child_abs_path) => child_abs_path,
2425 Err(error) => {
2426 log::error!("error processing entry {:?}", error);
2427 continue;
2428 }
2429 };
2430 let child_name = child_abs_path.file_name().unwrap();
2431 let child_path: Arc<Path> = job.path.join(child_name).into();
2432 let child_metadata = match self.fs.metadata(&child_abs_path).await? {
2433 Some(metadata) => metadata,
2434 None => continue,
2435 };
2436
2437 // If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
2438 if child_name == *GITIGNORE {
2439 match build_gitignore(&child_abs_path, self.fs.as_ref()) {
2440 Ok(ignore) => {
2441 let ignore = Arc::new(ignore);
2442 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2443 new_ignore = Some(ignore);
2444 }
2445 Err(error) => {
2446 log::error!(
2447 "error loading .gitignore file {:?} - {:?}",
2448 child_name,
2449 error
2450 );
2451 }
2452 }
2453
2454 // Update ignore status of any child entries we've already processed to reflect the
2455 // ignore file in the current directory. Because `.gitignore` starts with a `.`,
2456 // there should rarely be too numerous. Update the ignore stack associated with any
2457 // new jobs as well.
2458 let mut new_jobs = new_jobs.iter_mut();
2459 for entry in &mut new_entries {
2460 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2461 if entry.is_dir() {
2462 new_jobs.next().unwrap().ignore_stack = if entry.is_ignored {
2463 IgnoreStack::all()
2464 } else {
2465 ignore_stack.clone()
2466 };
2467 }
2468 }
2469 }
2470
2471 let mut child_entry = Entry::new(
2472 child_path.clone(),
2473 &child_metadata,
2474 &next_entry_id,
2475 root_char_bag,
2476 );
2477
2478 if child_metadata.is_dir {
2479 let is_ignored = ignore_stack.is_path_ignored(&child_path, true);
2480 child_entry.is_ignored = is_ignored;
2481 new_entries.push(child_entry);
2482 new_jobs.push(ScanJob {
2483 abs_path: child_abs_path,
2484 path: child_path,
2485 ignore_stack: if is_ignored {
2486 IgnoreStack::all()
2487 } else {
2488 ignore_stack.clone()
2489 },
2490 scan_queue: job.scan_queue.clone(),
2491 });
2492 } else {
2493 child_entry.is_ignored = ignore_stack.is_path_ignored(&child_path, false);
2494 new_entries.push(child_entry);
2495 };
2496 }
2497
2498 self.snapshot
2499 .lock()
2500 .populate_dir(job.path.clone(), new_entries, new_ignore);
2501 for new_job in new_jobs {
2502 job.scan_queue.send(new_job).await.unwrap();
2503 }
2504
2505 Ok(())
2506 }
2507
2508 async fn process_events(&mut self, mut events: Vec<fsevent::Event>) -> bool {
2509 let mut snapshot = self.snapshot();
2510 snapshot.scan_id += 1;
2511
2512 let root_abs_path = if let Ok(abs_path) = self.fs.canonicalize(&snapshot.abs_path).await {
2513 abs_path
2514 } else {
2515 return false;
2516 };
2517 let root_char_bag = snapshot.root_char_bag;
2518 let next_entry_id = snapshot.next_entry_id.clone();
2519
2520 events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
2521 events.dedup_by(|a, b| a.path.starts_with(&b.path));
2522
2523 for event in &events {
2524 match event.path.strip_prefix(&root_abs_path) {
2525 Ok(path) => snapshot.remove_path(&path),
2526 Err(_) => {
2527 log::error!(
2528 "unexpected event {:?} for root path {:?}",
2529 event.path,
2530 root_abs_path
2531 );
2532 continue;
2533 }
2534 }
2535 }
2536
2537 let (scan_queue_tx, scan_queue_rx) = channel::unbounded();
2538 for event in events {
2539 let path: Arc<Path> = match event.path.strip_prefix(&root_abs_path) {
2540 Ok(path) => Arc::from(path.to_path_buf()),
2541 Err(_) => {
2542 log::error!(
2543 "unexpected event {:?} for root path {:?}",
2544 event.path,
2545 root_abs_path
2546 );
2547 continue;
2548 }
2549 };
2550
2551 match self.fs.metadata(&event.path).await {
2552 Ok(Some(metadata)) => {
2553 let ignore_stack = snapshot.ignore_stack_for_path(&path, metadata.is_dir);
2554 let mut fs_entry = Entry::new(
2555 path.clone(),
2556 &metadata,
2557 snapshot.next_entry_id.as_ref(),
2558 snapshot.root_char_bag,
2559 );
2560 fs_entry.is_ignored = ignore_stack.is_all();
2561 snapshot.insert_entry(fs_entry, self.fs.as_ref());
2562 if metadata.is_dir {
2563 scan_queue_tx
2564 .send(ScanJob {
2565 abs_path: event.path,
2566 path,
2567 ignore_stack,
2568 scan_queue: scan_queue_tx.clone(),
2569 })
2570 .await
2571 .unwrap();
2572 }
2573 }
2574 Ok(None) => {}
2575 Err(err) => {
2576 // TODO - create a special 'error' entry in the entries tree to mark this
2577 log::error!("error reading file on event {:?}", err);
2578 }
2579 }
2580 }
2581
2582 *self.snapshot.lock() = snapshot;
2583
2584 // Scan any directories that were created as part of this event batch.
2585 drop(scan_queue_tx);
2586 self.executor
2587 .scoped(|scope| {
2588 for _ in 0..self.executor.num_cpus() {
2589 scope.spawn(async {
2590 while let Ok(job) = scan_queue_rx.recv().await {
2591 if let Err(err) = self
2592 .scan_dir(root_char_bag, next_entry_id.clone(), &job)
2593 .await
2594 {
2595 log::error!("error scanning {:?}: {}", job.abs_path, err);
2596 }
2597 }
2598 });
2599 }
2600 })
2601 .await;
2602
2603 // Attempt to detect renames only over a single batch of file-system events.
2604 self.snapshot.lock().removed_entry_ids.clear();
2605
2606 self.update_ignore_statuses().await;
2607 true
2608 }
2609
2610 async fn update_ignore_statuses(&self) {
2611 let mut snapshot = self.snapshot();
2612
2613 let mut ignores_to_update = Vec::new();
2614 let mut ignores_to_delete = Vec::new();
2615 for (parent_path, (_, scan_id)) in &snapshot.ignores {
2616 if *scan_id == snapshot.scan_id && snapshot.entry_for_path(parent_path).is_some() {
2617 ignores_to_update.push(parent_path.clone());
2618 }
2619
2620 let ignore_path = parent_path.join(&*GITIGNORE);
2621 if snapshot.entry_for_path(ignore_path).is_none() {
2622 ignores_to_delete.push(parent_path.clone());
2623 }
2624 }
2625
2626 for parent_path in ignores_to_delete {
2627 snapshot.ignores.remove(&parent_path);
2628 self.snapshot.lock().ignores.remove(&parent_path);
2629 }
2630
2631 let (ignore_queue_tx, ignore_queue_rx) = channel::unbounded();
2632 ignores_to_update.sort_unstable();
2633 let mut ignores_to_update = ignores_to_update.into_iter().peekable();
2634 while let Some(parent_path) = ignores_to_update.next() {
2635 while ignores_to_update
2636 .peek()
2637 .map_or(false, |p| p.starts_with(&parent_path))
2638 {
2639 ignores_to_update.next().unwrap();
2640 }
2641
2642 let ignore_stack = snapshot.ignore_stack_for_path(&parent_path, true);
2643 ignore_queue_tx
2644 .send(UpdateIgnoreStatusJob {
2645 path: parent_path,
2646 ignore_stack,
2647 ignore_queue: ignore_queue_tx.clone(),
2648 })
2649 .await
2650 .unwrap();
2651 }
2652 drop(ignore_queue_tx);
2653
2654 self.executor
2655 .scoped(|scope| {
2656 for _ in 0..self.executor.num_cpus() {
2657 scope.spawn(async {
2658 while let Ok(job) = ignore_queue_rx.recv().await {
2659 self.update_ignore_status(job, &snapshot).await;
2660 }
2661 });
2662 }
2663 })
2664 .await;
2665 }
2666
2667 async fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
2668 let mut ignore_stack = job.ignore_stack;
2669 if let Some((ignore, _)) = snapshot.ignores.get(&job.path) {
2670 ignore_stack = ignore_stack.append(job.path.clone(), ignore.clone());
2671 }
2672
2673 let mut entries_by_id_edits = Vec::new();
2674 let mut entries_by_path_edits = Vec::new();
2675 for mut entry in snapshot.child_entries(&job.path).cloned() {
2676 let was_ignored = entry.is_ignored;
2677 entry.is_ignored = ignore_stack.is_path_ignored(&entry.path, entry.is_dir());
2678 if entry.is_dir() {
2679 let child_ignore_stack = if entry.is_ignored {
2680 IgnoreStack::all()
2681 } else {
2682 ignore_stack.clone()
2683 };
2684 job.ignore_queue
2685 .send(UpdateIgnoreStatusJob {
2686 path: entry.path.clone(),
2687 ignore_stack: child_ignore_stack,
2688 ignore_queue: job.ignore_queue.clone(),
2689 })
2690 .await
2691 .unwrap();
2692 }
2693
2694 if entry.is_ignored != was_ignored {
2695 let mut path_entry = snapshot.entries_by_id.get(&entry.id, &()).unwrap().clone();
2696 path_entry.scan_id = snapshot.scan_id;
2697 path_entry.is_ignored = entry.is_ignored;
2698 entries_by_id_edits.push(Edit::Insert(path_entry));
2699 entries_by_path_edits.push(Edit::Insert(entry));
2700 }
2701 }
2702
2703 let mut snapshot = self.snapshot.lock();
2704 snapshot.entries_by_path.edit(entries_by_path_edits, &());
2705 snapshot.entries_by_id.edit(entries_by_id_edits, &());
2706 }
2707}
2708
2709async fn refresh_entry(
2710 fs: &dyn Fs,
2711 snapshot: &Mutex<Snapshot>,
2712 path: Arc<Path>,
2713 abs_path: &Path,
2714) -> Result<Entry> {
2715 let root_char_bag;
2716 let next_entry_id;
2717 {
2718 let snapshot = snapshot.lock();
2719 root_char_bag = snapshot.root_char_bag;
2720 next_entry_id = snapshot.next_entry_id.clone();
2721 }
2722 let entry = Entry::new(
2723 path,
2724 &fs.metadata(abs_path)
2725 .await?
2726 .ok_or_else(|| anyhow!("could not read saved file metadata"))?,
2727 &next_entry_id,
2728 root_char_bag,
2729 );
2730 Ok(snapshot.lock().insert_entry(entry, fs))
2731}
2732
2733fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag {
2734 let mut result = root_char_bag;
2735 result.extend(
2736 path.to_string_lossy()
2737 .chars()
2738 .map(|c| c.to_ascii_lowercase()),
2739 );
2740 result
2741}
2742
2743struct ScanJob {
2744 abs_path: PathBuf,
2745 path: Arc<Path>,
2746 ignore_stack: Arc<IgnoreStack>,
2747 scan_queue: Sender<ScanJob>,
2748}
2749
2750struct UpdateIgnoreStatusJob {
2751 path: Arc<Path>,
2752 ignore_stack: Arc<IgnoreStack>,
2753 ignore_queue: Sender<UpdateIgnoreStatusJob>,
2754}
2755
2756pub trait WorktreeHandle {
2757 #[cfg(test)]
2758 fn flush_fs_events<'a>(
2759 &self,
2760 cx: &'a gpui::TestAppContext,
2761 ) -> futures::future::LocalBoxFuture<'a, ()>;
2762}
2763
2764impl WorktreeHandle for ModelHandle<Worktree> {
2765 // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that
2766 // occurred before the worktree was constructed. These events can cause the worktree to perfrom
2767 // extra directory scans, and emit extra scan-state notifications.
2768 //
2769 // This function mutates the worktree's directory and waits for those mutations to be picked up,
2770 // to ensure that all redundant FS events have already been processed.
2771 #[cfg(test)]
2772 fn flush_fs_events<'a>(
2773 &self,
2774 cx: &'a gpui::TestAppContext,
2775 ) -> futures::future::LocalBoxFuture<'a, ()> {
2776 use smol::future::FutureExt;
2777
2778 let filename = "fs-event-sentinel";
2779 let root_path = cx.read(|cx| self.read(cx).abs_path.clone());
2780 let tree = self.clone();
2781 async move {
2782 std::fs::write(root_path.join(filename), "").unwrap();
2783 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_some())
2784 .await;
2785
2786 std::fs::remove_file(root_path.join(filename)).unwrap();
2787 tree.condition(&cx, |tree, _| tree.entry_for_path(filename).is_none())
2788 .await;
2789
2790 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
2791 .await;
2792 }
2793 .boxed_local()
2794 }
2795}
2796
2797#[derive(Clone, Debug)]
2798struct TraversalProgress<'a> {
2799 max_path: &'a Path,
2800 count: usize,
2801 visible_count: usize,
2802 file_count: usize,
2803 visible_file_count: usize,
2804}
2805
2806impl<'a> TraversalProgress<'a> {
2807 fn count(&self, include_dirs: bool, include_ignored: bool) -> usize {
2808 match (include_ignored, include_dirs) {
2809 (true, true) => self.count,
2810 (true, false) => self.file_count,
2811 (false, true) => self.visible_count,
2812 (false, false) => self.visible_file_count,
2813 }
2814 }
2815}
2816
2817impl<'a> sum_tree::Dimension<'a, EntrySummary> for TraversalProgress<'a> {
2818 fn add_summary(&mut self, summary: &'a EntrySummary, _: &()) {
2819 self.max_path = summary.max_path.as_ref();
2820 self.count += summary.count;
2821 self.visible_count += summary.visible_count;
2822 self.file_count += summary.file_count;
2823 self.visible_file_count += summary.visible_file_count;
2824 }
2825}
2826
2827impl<'a> Default for TraversalProgress<'a> {
2828 fn default() -> Self {
2829 Self {
2830 max_path: Path::new(""),
2831 count: 0,
2832 visible_count: 0,
2833 file_count: 0,
2834 visible_file_count: 0,
2835 }
2836 }
2837}
2838
2839pub struct Traversal<'a> {
2840 cursor: sum_tree::Cursor<'a, Entry, TraversalProgress<'a>>,
2841 include_ignored: bool,
2842 include_dirs: bool,
2843}
2844
2845impl<'a> Traversal<'a> {
2846 pub fn advance(&mut self) -> bool {
2847 self.advance_to_offset(self.offset() + 1)
2848 }
2849
2850 pub fn advance_to_offset(&mut self, offset: usize) -> bool {
2851 self.cursor.seek_forward(
2852 &TraversalTarget::Count {
2853 count: offset,
2854 include_dirs: self.include_dirs,
2855 include_ignored: self.include_ignored,
2856 },
2857 Bias::Right,
2858 &(),
2859 )
2860 }
2861
2862 pub fn advance_to_sibling(&mut self) -> bool {
2863 while let Some(entry) = self.cursor.item() {
2864 self.cursor.seek_forward(
2865 &TraversalTarget::PathSuccessor(&entry.path),
2866 Bias::Left,
2867 &(),
2868 );
2869 if let Some(entry) = self.cursor.item() {
2870 if (self.include_dirs || !entry.is_dir())
2871 && (self.include_ignored || !entry.is_ignored)
2872 {
2873 return true;
2874 }
2875 }
2876 }
2877 false
2878 }
2879
2880 pub fn entry(&self) -> Option<&'a Entry> {
2881 self.cursor.item()
2882 }
2883
2884 pub fn offset(&self) -> usize {
2885 self.cursor
2886 .start()
2887 .count(self.include_dirs, self.include_ignored)
2888 }
2889}
2890
2891impl<'a> Iterator for Traversal<'a> {
2892 type Item = &'a Entry;
2893
2894 fn next(&mut self) -> Option<Self::Item> {
2895 if let Some(item) = self.entry() {
2896 self.advance();
2897 Some(item)
2898 } else {
2899 None
2900 }
2901 }
2902}
2903
2904#[derive(Debug)]
2905enum TraversalTarget<'a> {
2906 Path(&'a Path),
2907 PathSuccessor(&'a Path),
2908 Count {
2909 count: usize,
2910 include_ignored: bool,
2911 include_dirs: bool,
2912 },
2913}
2914
2915impl<'a, 'b> SeekTarget<'a, EntrySummary, TraversalProgress<'a>> for TraversalTarget<'b> {
2916 fn cmp(&self, cursor_location: &TraversalProgress<'a>, _: &()) -> Ordering {
2917 match self {
2918 TraversalTarget::Path(path) => path.cmp(&cursor_location.max_path),
2919 TraversalTarget::PathSuccessor(path) => {
2920 if !cursor_location.max_path.starts_with(path) {
2921 Ordering::Equal
2922 } else {
2923 Ordering::Greater
2924 }
2925 }
2926 TraversalTarget::Count {
2927 count,
2928 include_dirs,
2929 include_ignored,
2930 } => Ord::cmp(
2931 count,
2932 &cursor_location.count(*include_dirs, *include_ignored),
2933 ),
2934 }
2935 }
2936}
2937
2938struct ChildEntriesIter<'a> {
2939 parent_path: &'a Path,
2940 traversal: Traversal<'a>,
2941}
2942
2943impl<'a> Iterator for ChildEntriesIter<'a> {
2944 type Item = &'a Entry;
2945
2946 fn next(&mut self) -> Option<Self::Item> {
2947 if let Some(item) = self.traversal.entry() {
2948 if item.path.starts_with(&self.parent_path) {
2949 self.traversal.advance_to_sibling();
2950 return Some(item);
2951 }
2952 }
2953 None
2954 }
2955}
2956
2957impl<'a> From<&'a Entry> for proto::Entry {
2958 fn from(entry: &'a Entry) -> Self {
2959 Self {
2960 id: entry.id as u64,
2961 is_dir: entry.is_dir(),
2962 path: entry.path.to_string_lossy().to_string(),
2963 inode: entry.inode,
2964 mtime: Some(entry.mtime.into()),
2965 is_symlink: entry.is_symlink,
2966 is_ignored: entry.is_ignored,
2967 }
2968 }
2969}
2970
2971impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
2972 type Error = anyhow::Error;
2973
2974 fn try_from((root_char_bag, entry): (&'a CharBag, proto::Entry)) -> Result<Self> {
2975 if let Some(mtime) = entry.mtime {
2976 let kind = if entry.is_dir {
2977 EntryKind::Dir
2978 } else {
2979 let mut char_bag = root_char_bag.clone();
2980 char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
2981 EntryKind::File(char_bag)
2982 };
2983 let path: Arc<Path> = Arc::from(Path::new(&entry.path));
2984 Ok(Entry {
2985 id: entry.id as usize,
2986 kind,
2987 path: path.clone(),
2988 inode: entry.inode,
2989 mtime: mtime.into(),
2990 is_symlink: entry.is_symlink,
2991 is_ignored: entry.is_ignored,
2992 })
2993 } else {
2994 Err(anyhow!(
2995 "missing mtime in remote worktree entry {:?}",
2996 entry.path
2997 ))
2998 }
2999 }
3000}
3001
3002#[cfg(test)]
3003mod tests {
3004 use super::*;
3005 use crate::fs::FakeFs;
3006 use anyhow::Result;
3007 use client::test::{FakeHttpClient, FakeServer};
3008 use fs::RealFs;
3009 use language::{tree_sitter_rust, LanguageServerConfig};
3010 use language::{Diagnostic, LanguageConfig};
3011 use lsp::Url;
3012 use rand::prelude::*;
3013 use serde_json::json;
3014 use std::{cell::RefCell, rc::Rc};
3015 use std::{
3016 env,
3017 fmt::Write,
3018 time::{SystemTime, UNIX_EPOCH},
3019 };
3020 use text::Point;
3021 use util::test::temp_tree;
3022
3023 #[gpui::test]
3024 async fn test_traversal(mut cx: gpui::TestAppContext) {
3025 let fs = FakeFs::new();
3026 fs.insert_tree(
3027 "/root",
3028 json!({
3029 ".gitignore": "a/b\n",
3030 "a": {
3031 "b": "",
3032 "c": "",
3033 }
3034 }),
3035 )
3036 .await;
3037
3038 let client = Client::new();
3039 let http_client = FakeHttpClient::with_404_response();
3040 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3041
3042 let tree = Worktree::open_local(
3043 client,
3044 user_store,
3045 Arc::from(Path::new("/root")),
3046 Arc::new(fs),
3047 Default::default(),
3048 &mut cx.to_async(),
3049 )
3050 .await
3051 .unwrap();
3052 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3053 .await;
3054
3055 tree.read_with(&cx, |tree, _| {
3056 assert_eq!(
3057 tree.entries(false)
3058 .map(|entry| entry.path.as_ref())
3059 .collect::<Vec<_>>(),
3060 vec![
3061 Path::new(""),
3062 Path::new(".gitignore"),
3063 Path::new("a"),
3064 Path::new("a/c"),
3065 ]
3066 );
3067 })
3068 }
3069
3070 #[gpui::test]
3071 async fn test_save_file(mut cx: gpui::TestAppContext) {
3072 let dir = temp_tree(json!({
3073 "file1": "the old contents",
3074 }));
3075
3076 let client = Client::new();
3077 let http_client = FakeHttpClient::with_404_response();
3078 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3079
3080 let tree = Worktree::open_local(
3081 client,
3082 user_store,
3083 dir.path(),
3084 Arc::new(RealFs),
3085 Default::default(),
3086 &mut cx.to_async(),
3087 )
3088 .await
3089 .unwrap();
3090 let buffer = tree
3091 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3092 .await
3093 .unwrap();
3094 let save = buffer.update(&mut cx, |buffer, cx| {
3095 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3096 buffer.save(cx).unwrap()
3097 });
3098 save.await.unwrap();
3099
3100 let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
3101 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3102 }
3103
3104 #[gpui::test]
3105 async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
3106 let dir = temp_tree(json!({
3107 "file1": "the old contents",
3108 }));
3109 let file_path = dir.path().join("file1");
3110
3111 let client = Client::new();
3112 let http_client = FakeHttpClient::with_404_response();
3113 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3114
3115 let tree = Worktree::open_local(
3116 client,
3117 user_store,
3118 file_path.clone(),
3119 Arc::new(RealFs),
3120 Default::default(),
3121 &mut cx.to_async(),
3122 )
3123 .await
3124 .unwrap();
3125 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3126 .await;
3127 cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
3128
3129 let buffer = tree
3130 .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
3131 .await
3132 .unwrap();
3133 let save = buffer.update(&mut cx, |buffer, cx| {
3134 buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
3135 buffer.save(cx).unwrap()
3136 });
3137 save.await.unwrap();
3138
3139 let new_text = std::fs::read_to_string(file_path).unwrap();
3140 assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
3141 }
3142
3143 #[gpui::test]
3144 async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
3145 let dir = temp_tree(json!({
3146 "a": {
3147 "file1": "",
3148 "file2": "",
3149 "file3": "",
3150 },
3151 "b": {
3152 "c": {
3153 "file4": "",
3154 "file5": "",
3155 }
3156 }
3157 }));
3158
3159 let user_id = 5;
3160 let mut client = Client::new();
3161 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3162 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3163 let tree = Worktree::open_local(
3164 client,
3165 user_store.clone(),
3166 dir.path(),
3167 Arc::new(RealFs),
3168 Default::default(),
3169 &mut cx.to_async(),
3170 )
3171 .await
3172 .unwrap();
3173
3174 let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
3175 let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
3176 async move { buffer.await.unwrap() }
3177 };
3178 let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
3179 tree.read_with(cx, |tree, _| {
3180 tree.entry_for_path(path)
3181 .expect(&format!("no entry for path {}", path))
3182 .id
3183 })
3184 };
3185
3186 let buffer2 = buffer_for_path("a/file2", &mut cx).await;
3187 let buffer3 = buffer_for_path("a/file3", &mut cx).await;
3188 let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
3189 let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
3190
3191 let file2_id = id_for_path("a/file2", &cx);
3192 let file3_id = id_for_path("a/file3", &cx);
3193 let file4_id = id_for_path("b/c/file4", &cx);
3194
3195 // Wait for the initial scan.
3196 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3197 .await;
3198
3199 // Create a remote copy of this worktree.
3200 let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
3201 let worktree_id = 1;
3202 let share_request = tree.update(&mut cx, |tree, cx| {
3203 tree.as_local().unwrap().share_request(cx)
3204 });
3205 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3206 server
3207 .respond(
3208 open_worktree.receipt(),
3209 proto::OpenWorktreeResponse { worktree_id: 1 },
3210 )
3211 .await;
3212
3213 let remote = Worktree::remote(
3214 proto::JoinWorktreeResponse {
3215 worktree: share_request.await.unwrap().worktree,
3216 replica_id: 1,
3217 collaborators: Vec::new(),
3218 },
3219 Client::new(),
3220 user_store,
3221 Default::default(),
3222 &mut cx.to_async(),
3223 )
3224 .await
3225 .unwrap();
3226
3227 cx.read(|cx| {
3228 assert!(!buffer2.read(cx).is_dirty());
3229 assert!(!buffer3.read(cx).is_dirty());
3230 assert!(!buffer4.read(cx).is_dirty());
3231 assert!(!buffer5.read(cx).is_dirty());
3232 });
3233
3234 // Rename and delete files and directories.
3235 tree.flush_fs_events(&cx).await;
3236 std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
3237 std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
3238 std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
3239 std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
3240 tree.flush_fs_events(&cx).await;
3241
3242 let expected_paths = vec![
3243 "a",
3244 "a/file1",
3245 "a/file2.new",
3246 "b",
3247 "d",
3248 "d/file3",
3249 "d/file4",
3250 ];
3251
3252 cx.read(|app| {
3253 assert_eq!(
3254 tree.read(app)
3255 .paths()
3256 .map(|p| p.to_str().unwrap())
3257 .collect::<Vec<_>>(),
3258 expected_paths
3259 );
3260
3261 assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
3262 assert_eq!(id_for_path("d/file3", &cx), file3_id);
3263 assert_eq!(id_for_path("d/file4", &cx), file4_id);
3264
3265 assert_eq!(
3266 buffer2.read(app).file().unwrap().path().as_ref(),
3267 Path::new("a/file2.new")
3268 );
3269 assert_eq!(
3270 buffer3.read(app).file().unwrap().path().as_ref(),
3271 Path::new("d/file3")
3272 );
3273 assert_eq!(
3274 buffer4.read(app).file().unwrap().path().as_ref(),
3275 Path::new("d/file4")
3276 );
3277 assert_eq!(
3278 buffer5.read(app).file().unwrap().path().as_ref(),
3279 Path::new("b/c/file5")
3280 );
3281
3282 assert!(!buffer2.read(app).file().unwrap().is_deleted());
3283 assert!(!buffer3.read(app).file().unwrap().is_deleted());
3284 assert!(!buffer4.read(app).file().unwrap().is_deleted());
3285 assert!(buffer5.read(app).file().unwrap().is_deleted());
3286 });
3287
3288 // Update the remote worktree. Check that it becomes consistent with the
3289 // local worktree.
3290 remote.update(&mut cx, |remote, cx| {
3291 let update_message =
3292 tree.read(cx)
3293 .snapshot()
3294 .build_update(&initial_snapshot, worktree_id, true);
3295 remote
3296 .as_remote_mut()
3297 .unwrap()
3298 .snapshot
3299 .apply_update(update_message)
3300 .unwrap();
3301
3302 assert_eq!(
3303 remote
3304 .paths()
3305 .map(|p| p.to_str().unwrap())
3306 .collect::<Vec<_>>(),
3307 expected_paths
3308 );
3309 });
3310 }
3311
3312 #[gpui::test]
3313 async fn test_rescan_with_gitignore(mut cx: gpui::TestAppContext) {
3314 let dir = temp_tree(json!({
3315 ".git": {},
3316 ".gitignore": "ignored-dir\n",
3317 "tracked-dir": {
3318 "tracked-file1": "tracked contents",
3319 },
3320 "ignored-dir": {
3321 "ignored-file1": "ignored contents",
3322 }
3323 }));
3324
3325 let client = Client::new();
3326 let http_client = FakeHttpClient::with_404_response();
3327 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3328
3329 let tree = Worktree::open_local(
3330 client,
3331 user_store,
3332 dir.path(),
3333 Arc::new(RealFs),
3334 Default::default(),
3335 &mut cx.to_async(),
3336 )
3337 .await
3338 .unwrap();
3339 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3340 .await;
3341 tree.flush_fs_events(&cx).await;
3342 cx.read(|cx| {
3343 let tree = tree.read(cx);
3344 let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap();
3345 let ignored = tree.entry_for_path("ignored-dir/ignored-file1").unwrap();
3346 assert_eq!(tracked.is_ignored, false);
3347 assert_eq!(ignored.is_ignored, true);
3348 });
3349
3350 std::fs::write(dir.path().join("tracked-dir/tracked-file2"), "").unwrap();
3351 std::fs::write(dir.path().join("ignored-dir/ignored-file2"), "").unwrap();
3352 tree.flush_fs_events(&cx).await;
3353 cx.read(|cx| {
3354 let tree = tree.read(cx);
3355 let dot_git = tree.entry_for_path(".git").unwrap();
3356 let tracked = tree.entry_for_path("tracked-dir/tracked-file2").unwrap();
3357 let ignored = tree.entry_for_path("ignored-dir/ignored-file2").unwrap();
3358 assert_eq!(tracked.is_ignored, false);
3359 assert_eq!(ignored.is_ignored, true);
3360 assert_eq!(dot_git.is_ignored, true);
3361 });
3362 }
3363
3364 #[gpui::test]
3365 async fn test_open_and_share_worktree(mut cx: gpui::TestAppContext) {
3366 let user_id = 100;
3367 let mut client = Client::new();
3368 let server = FakeServer::for_client(user_id, &mut client, &cx).await;
3369 let user_store = server.build_user_store(client.clone(), &mut cx).await;
3370
3371 let fs = Arc::new(FakeFs::new());
3372 fs.insert_tree(
3373 "/path",
3374 json!({
3375 "to": {
3376 "the-dir": {
3377 ".zed.toml": r#"collaborators = ["friend-1", "friend-2"]"#,
3378 "a.txt": "a-contents",
3379 },
3380 },
3381 }),
3382 )
3383 .await;
3384
3385 let worktree = Worktree::open_local(
3386 client.clone(),
3387 user_store,
3388 "/path/to/the-dir".as_ref(),
3389 fs,
3390 Default::default(),
3391 &mut cx.to_async(),
3392 )
3393 .await
3394 .unwrap();
3395
3396 let open_worktree = server.receive::<proto::OpenWorktree>().await.unwrap();
3397 assert_eq!(
3398 open_worktree.payload,
3399 proto::OpenWorktree {
3400 root_name: "the-dir".to_string(),
3401 authorized_logins: vec!["friend-1".to_string(), "friend-2".to_string()],
3402 }
3403 );
3404
3405 server
3406 .respond(
3407 open_worktree.receipt(),
3408 proto::OpenWorktreeResponse { worktree_id: 5 },
3409 )
3410 .await;
3411 let remote_id = worktree
3412 .update(&mut cx, |tree, _| tree.as_local().unwrap().next_remote_id())
3413 .await;
3414 assert_eq!(remote_id, Some(5));
3415
3416 cx.update(move |_| drop(worktree));
3417 server.receive::<proto::CloseWorktree>().await.unwrap();
3418 }
3419
3420 #[gpui::test]
3421 async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
3422 use std::fs;
3423
3424 let dir = temp_tree(json!({
3425 "file1": "abc",
3426 "file2": "def",
3427 "file3": "ghi",
3428 }));
3429 let client = Client::new();
3430 let http_client = FakeHttpClient::with_404_response();
3431 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3432
3433 let tree = Worktree::open_local(
3434 client,
3435 user_store,
3436 dir.path(),
3437 Arc::new(RealFs),
3438 Default::default(),
3439 &mut cx.to_async(),
3440 )
3441 .await
3442 .unwrap();
3443 tree.flush_fs_events(&cx).await;
3444 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3445 .await;
3446
3447 let buffer1 = tree
3448 .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
3449 .await
3450 .unwrap();
3451 let events = Rc::new(RefCell::new(Vec::new()));
3452
3453 // initially, the buffer isn't dirty.
3454 buffer1.update(&mut cx, |buffer, cx| {
3455 cx.subscribe(&buffer1, {
3456 let events = events.clone();
3457 move |_, _, event, _| events.borrow_mut().push(event.clone())
3458 })
3459 .detach();
3460
3461 assert!(!buffer.is_dirty());
3462 assert!(events.borrow().is_empty());
3463
3464 buffer.edit(vec![1..2], "", cx);
3465 });
3466
3467 // after the first edit, the buffer is dirty, and emits a dirtied event.
3468 buffer1.update(&mut cx, |buffer, cx| {
3469 assert!(buffer.text() == "ac");
3470 assert!(buffer.is_dirty());
3471 assert_eq!(
3472 *events.borrow(),
3473 &[language::Event::Edited, language::Event::Dirtied]
3474 );
3475 events.borrow_mut().clear();
3476 buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
3477 });
3478
3479 // after saving, the buffer is not dirty, and emits a saved event.
3480 buffer1.update(&mut cx, |buffer, cx| {
3481 assert!(!buffer.is_dirty());
3482 assert_eq!(*events.borrow(), &[language::Event::Saved]);
3483 events.borrow_mut().clear();
3484
3485 buffer.edit(vec![1..1], "B", cx);
3486 buffer.edit(vec![2..2], "D", cx);
3487 });
3488
3489 // after editing again, the buffer is dirty, and emits another dirty event.
3490 buffer1.update(&mut cx, |buffer, cx| {
3491 assert!(buffer.text() == "aBDc");
3492 assert!(buffer.is_dirty());
3493 assert_eq!(
3494 *events.borrow(),
3495 &[
3496 language::Event::Edited,
3497 language::Event::Dirtied,
3498 language::Event::Edited
3499 ],
3500 );
3501 events.borrow_mut().clear();
3502
3503 // TODO - currently, after restoring the buffer to its
3504 // previously-saved state, the is still considered dirty.
3505 buffer.edit(vec![1..3], "", cx);
3506 assert!(buffer.text() == "ac");
3507 assert!(buffer.is_dirty());
3508 });
3509
3510 assert_eq!(*events.borrow(), &[language::Event::Edited]);
3511
3512 // When a file is deleted, the buffer is considered dirty.
3513 let events = Rc::new(RefCell::new(Vec::new()));
3514 let buffer2 = tree
3515 .update(&mut cx, |tree, cx| tree.open_buffer("file2", cx))
3516 .await
3517 .unwrap();
3518 buffer2.update(&mut cx, |_, cx| {
3519 cx.subscribe(&buffer2, {
3520 let events = events.clone();
3521 move |_, _, event, _| events.borrow_mut().push(event.clone())
3522 })
3523 .detach();
3524 });
3525
3526 fs::remove_file(dir.path().join("file2")).unwrap();
3527 buffer2.condition(&cx, |b, _| b.is_dirty()).await;
3528 assert_eq!(
3529 *events.borrow(),
3530 &[language::Event::Dirtied, language::Event::FileHandleChanged]
3531 );
3532
3533 // When a file is already dirty when deleted, we don't emit a Dirtied event.
3534 let events = Rc::new(RefCell::new(Vec::new()));
3535 let buffer3 = tree
3536 .update(&mut cx, |tree, cx| tree.open_buffer("file3", cx))
3537 .await
3538 .unwrap();
3539 buffer3.update(&mut cx, |_, cx| {
3540 cx.subscribe(&buffer3, {
3541 let events = events.clone();
3542 move |_, _, event, _| events.borrow_mut().push(event.clone())
3543 })
3544 .detach();
3545 });
3546
3547 tree.flush_fs_events(&cx).await;
3548 buffer3.update(&mut cx, |buffer, cx| {
3549 buffer.edit(Some(0..0), "x", cx);
3550 });
3551 events.borrow_mut().clear();
3552 fs::remove_file(dir.path().join("file3")).unwrap();
3553 buffer3
3554 .condition(&cx, |_, _| !events.borrow().is_empty())
3555 .await;
3556 assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
3557 cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
3558 }
3559
3560 #[gpui::test]
3561 async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
3562 use std::fs;
3563 use text::{Point, Selection, SelectionGoal};
3564
3565 let initial_contents = "aaa\nbbbbb\nc\n";
3566 let dir = temp_tree(json!({ "the-file": initial_contents }));
3567 let client = Client::new();
3568 let http_client = FakeHttpClient::with_404_response();
3569 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3570
3571 let tree = Worktree::open_local(
3572 client,
3573 user_store,
3574 dir.path(),
3575 Arc::new(RealFs),
3576 Default::default(),
3577 &mut cx.to_async(),
3578 )
3579 .await
3580 .unwrap();
3581 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3582 .await;
3583
3584 let abs_path = dir.path().join("the-file");
3585 let buffer = tree
3586 .update(&mut cx, |tree, cx| {
3587 tree.open_buffer(Path::new("the-file"), cx)
3588 })
3589 .await
3590 .unwrap();
3591
3592 // Add a cursor on each row.
3593 let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
3594 assert!(!buffer.is_dirty());
3595 buffer.add_selection_set(
3596 &(0..3)
3597 .map(|row| Selection {
3598 id: row as usize,
3599 start: Point::new(row, 1),
3600 end: Point::new(row, 1),
3601 reversed: false,
3602 goal: SelectionGoal::None,
3603 })
3604 .collect::<Vec<_>>(),
3605 cx,
3606 )
3607 });
3608
3609 // Change the file on disk, adding two new lines of text, and removing
3610 // one line.
3611 buffer.read_with(&cx, |buffer, _| {
3612 assert!(!buffer.is_dirty());
3613 assert!(!buffer.has_conflict());
3614 });
3615 let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
3616 fs::write(&abs_path, new_contents).unwrap();
3617
3618 // Because the buffer was not modified, it is reloaded from disk. Its
3619 // contents are edited according to the diff between the old and new
3620 // file contents.
3621 buffer
3622 .condition(&cx, |buffer, _| buffer.text() == new_contents)
3623 .await;
3624
3625 buffer.update(&mut cx, |buffer, _| {
3626 assert_eq!(buffer.text(), new_contents);
3627 assert!(!buffer.is_dirty());
3628 assert!(!buffer.has_conflict());
3629
3630 let cursor_positions = buffer
3631 .selection_set(selection_set_id)
3632 .unwrap()
3633 .selections::<Point, _>(&*buffer)
3634 .map(|selection| {
3635 assert_eq!(selection.start, selection.end);
3636 selection.start
3637 })
3638 .collect::<Vec<_>>();
3639 assert_eq!(
3640 cursor_positions,
3641 [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
3642 );
3643 });
3644
3645 // Modify the buffer
3646 buffer.update(&mut cx, |buffer, cx| {
3647 buffer.edit(vec![0..0], " ", cx);
3648 assert!(buffer.is_dirty());
3649 assert!(!buffer.has_conflict());
3650 });
3651
3652 // Change the file on disk again, adding blank lines to the beginning.
3653 fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
3654
3655 // Because the buffer is modified, it doesn't reload from disk, but is
3656 // marked as having a conflict.
3657 buffer
3658 .condition(&cx, |buffer, _| buffer.has_conflict())
3659 .await;
3660 }
3661
3662 #[gpui::test]
3663 async fn test_language_server_diagnostics(mut cx: gpui::TestAppContext) {
3664 let (language_server_config, mut fake_server) =
3665 LanguageServerConfig::fake(cx.background()).await;
3666 let mut languages = LanguageRegistry::new();
3667 languages.add(Arc::new(Language::new(
3668 LanguageConfig {
3669 name: "Rust".to_string(),
3670 path_suffixes: vec!["rs".to_string()],
3671 language_server: Some(language_server_config),
3672 ..Default::default()
3673 },
3674 Some(tree_sitter_rust::language()),
3675 )));
3676
3677 let dir = temp_tree(json!({
3678 "a.rs": "fn a() { A }",
3679 "b.rs": "const y: i32 = 1",
3680 }));
3681
3682 let client = Client::new();
3683 let http_client = FakeHttpClient::with_404_response();
3684 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
3685
3686 let tree = Worktree::open_local(
3687 client,
3688 user_store,
3689 dir.path(),
3690 Arc::new(RealFs),
3691 Arc::new(languages),
3692 &mut cx.to_async(),
3693 )
3694 .await
3695 .unwrap();
3696 cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
3697 .await;
3698
3699 // Cause worktree to start the fake language server
3700 let _buffer = tree
3701 .update(&mut cx, |tree, cx| tree.open_buffer("b.rs", cx))
3702 .await
3703 .unwrap();
3704
3705 fake_server
3706 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
3707 uri: Url::from_file_path(dir.path().join("a.rs")).unwrap(),
3708 version: None,
3709 diagnostics: vec![lsp::Diagnostic {
3710 range: lsp::Range::new(lsp::Position::new(0, 9), lsp::Position::new(0, 10)),
3711 severity: Some(lsp::DiagnosticSeverity::ERROR),
3712 message: "undefined variable 'A'".to_string(),
3713 ..Default::default()
3714 }],
3715 })
3716 .await;
3717
3718 let buffer = tree
3719 .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
3720 .await
3721 .unwrap();
3722
3723 buffer.read_with(&cx, |buffer, _| {
3724 let diagnostics = buffer
3725 .diagnostics_in_range(0..buffer.len())
3726 .collect::<Vec<_>>();
3727 assert_eq!(
3728 diagnostics,
3729 &[(
3730 Point::new(0, 9)..Point::new(0, 10),
3731 &Diagnostic {
3732 severity: lsp::DiagnosticSeverity::ERROR,
3733 message: "undefined variable 'A'".to_string(),
3734 group_id: 0,
3735 is_primary: true
3736 }
3737 )]
3738 )
3739 });
3740 }
3741
3742 #[gpui::test(iterations = 100)]
3743 fn test_random(mut rng: StdRng) {
3744 let operations = env::var("OPERATIONS")
3745 .map(|o| o.parse().unwrap())
3746 .unwrap_or(40);
3747 let initial_entries = env::var("INITIAL_ENTRIES")
3748 .map(|o| o.parse().unwrap())
3749 .unwrap_or(20);
3750
3751 let root_dir = tempdir::TempDir::new("worktree-test").unwrap();
3752 for _ in 0..initial_entries {
3753 randomly_mutate_tree(root_dir.path(), 1.0, &mut rng).unwrap();
3754 }
3755 log::info!("Generated initial tree");
3756
3757 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3758 let fs = Arc::new(RealFs);
3759 let next_entry_id = Arc::new(AtomicUsize::new(0));
3760 let mut initial_snapshot = Snapshot {
3761 id: 0,
3762 scan_id: 0,
3763 abs_path: root_dir.path().into(),
3764 entries_by_path: Default::default(),
3765 entries_by_id: Default::default(),
3766 removed_entry_ids: Default::default(),
3767 ignores: Default::default(),
3768 root_name: Default::default(),
3769 root_char_bag: Default::default(),
3770 next_entry_id: next_entry_id.clone(),
3771 };
3772 initial_snapshot.insert_entry(
3773 Entry::new(
3774 Path::new("").into(),
3775 &smol::block_on(fs.metadata(root_dir.path()))
3776 .unwrap()
3777 .unwrap(),
3778 &next_entry_id,
3779 Default::default(),
3780 ),
3781 fs.as_ref(),
3782 );
3783 let mut scanner = BackgroundScanner::new(
3784 Arc::new(Mutex::new(initial_snapshot.clone())),
3785 notify_tx,
3786 fs.clone(),
3787 Arc::new(gpui::executor::Background::new()),
3788 );
3789 smol::block_on(scanner.scan_dirs()).unwrap();
3790 scanner.snapshot().check_invariants();
3791
3792 let mut events = Vec::new();
3793 let mut snapshots = Vec::new();
3794 let mut mutations_len = operations;
3795 while mutations_len > 1 {
3796 if !events.is_empty() && rng.gen_bool(0.4) {
3797 let len = rng.gen_range(0..=events.len());
3798 let to_deliver = events.drain(0..len).collect::<Vec<_>>();
3799 log::info!("Delivering events: {:#?}", to_deliver);
3800 smol::block_on(scanner.process_events(to_deliver));
3801 scanner.snapshot().check_invariants();
3802 } else {
3803 events.extend(randomly_mutate_tree(root_dir.path(), 0.6, &mut rng).unwrap());
3804 mutations_len -= 1;
3805 }
3806
3807 if rng.gen_bool(0.2) {
3808 snapshots.push(scanner.snapshot());
3809 }
3810 }
3811 log::info!("Quiescing: {:#?}", events);
3812 smol::block_on(scanner.process_events(events));
3813 scanner.snapshot().check_invariants();
3814
3815 let (notify_tx, _notify_rx) = smol::channel::unbounded();
3816 let mut new_scanner = BackgroundScanner::new(
3817 Arc::new(Mutex::new(initial_snapshot)),
3818 notify_tx,
3819 scanner.fs.clone(),
3820 scanner.executor.clone(),
3821 );
3822 smol::block_on(new_scanner.scan_dirs()).unwrap();
3823 assert_eq!(
3824 scanner.snapshot().to_vec(true),
3825 new_scanner.snapshot().to_vec(true)
3826 );
3827
3828 for mut prev_snapshot in snapshots {
3829 let include_ignored = rng.gen::<bool>();
3830 if !include_ignored {
3831 let mut entries_by_path_edits = Vec::new();
3832 let mut entries_by_id_edits = Vec::new();
3833 for entry in prev_snapshot
3834 .entries_by_id
3835 .cursor::<()>()
3836 .filter(|e| e.is_ignored)
3837 {
3838 entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
3839 entries_by_id_edits.push(Edit::Remove(entry.id));
3840 }
3841
3842 prev_snapshot
3843 .entries_by_path
3844 .edit(entries_by_path_edits, &());
3845 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
3846 }
3847
3848 let update = scanner
3849 .snapshot()
3850 .build_update(&prev_snapshot, 0, include_ignored);
3851 prev_snapshot.apply_update(update).unwrap();
3852 assert_eq!(
3853 prev_snapshot.to_vec(true),
3854 scanner.snapshot().to_vec(include_ignored)
3855 );
3856 }
3857 }
3858
3859 fn randomly_mutate_tree(
3860 root_path: &Path,
3861 insertion_probability: f64,
3862 rng: &mut impl Rng,
3863 ) -> Result<Vec<fsevent::Event>> {
3864 let root_path = root_path.canonicalize().unwrap();
3865 let (dirs, files) = read_dir_recursive(root_path.clone());
3866
3867 let mut events = Vec::new();
3868 let mut record_event = |path: PathBuf| {
3869 events.push(fsevent::Event {
3870 event_id: SystemTime::now()
3871 .duration_since(UNIX_EPOCH)
3872 .unwrap()
3873 .as_secs(),
3874 flags: fsevent::StreamFlags::empty(),
3875 path,
3876 });
3877 };
3878
3879 if (files.is_empty() && dirs.len() == 1) || rng.gen_bool(insertion_probability) {
3880 let path = dirs.choose(rng).unwrap();
3881 let new_path = path.join(gen_name(rng));
3882
3883 if rng.gen() {
3884 log::info!("Creating dir {:?}", new_path.strip_prefix(root_path)?);
3885 std::fs::create_dir(&new_path)?;
3886 } else {
3887 log::info!("Creating file {:?}", new_path.strip_prefix(root_path)?);
3888 std::fs::write(&new_path, "")?;
3889 }
3890 record_event(new_path);
3891 } else if rng.gen_bool(0.05) {
3892 let ignore_dir_path = dirs.choose(rng).unwrap();
3893 let ignore_path = ignore_dir_path.join(&*GITIGNORE);
3894
3895 let (subdirs, subfiles) = read_dir_recursive(ignore_dir_path.clone());
3896 let files_to_ignore = {
3897 let len = rng.gen_range(0..=subfiles.len());
3898 subfiles.choose_multiple(rng, len)
3899 };
3900 let dirs_to_ignore = {
3901 let len = rng.gen_range(0..subdirs.len());
3902 subdirs.choose_multiple(rng, len)
3903 };
3904
3905 let mut ignore_contents = String::new();
3906 for path_to_ignore in files_to_ignore.chain(dirs_to_ignore) {
3907 write!(
3908 ignore_contents,
3909 "{}\n",
3910 path_to_ignore
3911 .strip_prefix(&ignore_dir_path)?
3912 .to_str()
3913 .unwrap()
3914 )
3915 .unwrap();
3916 }
3917 log::info!(
3918 "Creating {:?} with contents:\n{}",
3919 ignore_path.strip_prefix(&root_path)?,
3920 ignore_contents
3921 );
3922 std::fs::write(&ignore_path, ignore_contents).unwrap();
3923 record_event(ignore_path);
3924 } else {
3925 let old_path = {
3926 let file_path = files.choose(rng);
3927 let dir_path = dirs[1..].choose(rng);
3928 file_path.into_iter().chain(dir_path).choose(rng).unwrap()
3929 };
3930
3931 let is_rename = rng.gen();
3932 if is_rename {
3933 let new_path_parent = dirs
3934 .iter()
3935 .filter(|d| !d.starts_with(old_path))
3936 .choose(rng)
3937 .unwrap();
3938
3939 let overwrite_existing_dir =
3940 !old_path.starts_with(&new_path_parent) && rng.gen_bool(0.3);
3941 let new_path = if overwrite_existing_dir {
3942 std::fs::remove_dir_all(&new_path_parent).ok();
3943 new_path_parent.to_path_buf()
3944 } else {
3945 new_path_parent.join(gen_name(rng))
3946 };
3947
3948 log::info!(
3949 "Renaming {:?} to {}{:?}",
3950 old_path.strip_prefix(&root_path)?,
3951 if overwrite_existing_dir {
3952 "overwrite "
3953 } else {
3954 ""
3955 },
3956 new_path.strip_prefix(&root_path)?
3957 );
3958 std::fs::rename(&old_path, &new_path)?;
3959 record_event(old_path.clone());
3960 record_event(new_path);
3961 } else if old_path.is_dir() {
3962 let (dirs, files) = read_dir_recursive(old_path.clone());
3963
3964 log::info!("Deleting dir {:?}", old_path.strip_prefix(&root_path)?);
3965 std::fs::remove_dir_all(&old_path).unwrap();
3966 for file in files {
3967 record_event(file);
3968 }
3969 for dir in dirs {
3970 record_event(dir);
3971 }
3972 } else {
3973 log::info!("Deleting file {:?}", old_path.strip_prefix(&root_path)?);
3974 std::fs::remove_file(old_path).unwrap();
3975 record_event(old_path.clone());
3976 }
3977 }
3978
3979 Ok(events)
3980 }
3981
3982 fn read_dir_recursive(path: PathBuf) -> (Vec<PathBuf>, Vec<PathBuf>) {
3983 let child_entries = std::fs::read_dir(&path).unwrap();
3984 let mut dirs = vec![path];
3985 let mut files = Vec::new();
3986 for child_entry in child_entries {
3987 let child_path = child_entry.unwrap().path();
3988 if child_path.is_dir() {
3989 let (child_dirs, child_files) = read_dir_recursive(child_path);
3990 dirs.extend(child_dirs);
3991 files.extend(child_files);
3992 } else {
3993 files.push(child_path);
3994 }
3995 }
3996 (dirs, files)
3997 }
3998
3999 fn gen_name(rng: &mut impl Rng) -> String {
4000 (0..6)
4001 .map(|_| rng.sample(rand::distributions::Alphanumeric))
4002 .map(char::from)
4003 .collect()
4004 }
4005
4006 impl Snapshot {
4007 fn check_invariants(&self) {
4008 let mut files = self.files(true, 0);
4009 let mut visible_files = self.files(false, 0);
4010 for entry in self.entries_by_path.cursor::<()>() {
4011 if entry.is_file() {
4012 assert_eq!(files.next().unwrap().inode, entry.inode);
4013 if !entry.is_ignored {
4014 assert_eq!(visible_files.next().unwrap().inode, entry.inode);
4015 }
4016 }
4017 }
4018 assert!(files.next().is_none());
4019 assert!(visible_files.next().is_none());
4020
4021 let mut bfs_paths = Vec::new();
4022 let mut stack = vec![Path::new("")];
4023 while let Some(path) = stack.pop() {
4024 bfs_paths.push(path);
4025 let ix = stack.len();
4026 for child_entry in self.child_entries(path) {
4027 stack.insert(ix, &child_entry.path);
4028 }
4029 }
4030
4031 let dfs_paths = self
4032 .entries_by_path
4033 .cursor::<()>()
4034 .map(|e| e.path.as_ref())
4035 .collect::<Vec<_>>();
4036 assert_eq!(bfs_paths, dfs_paths);
4037
4038 for (ignore_parent_path, _) in &self.ignores {
4039 assert!(self.entry_for_path(ignore_parent_path).is_some());
4040 assert!(self
4041 .entry_for_path(ignore_parent_path.join(&*GITIGNORE))
4042 .is_some());
4043 }
4044 }
4045
4046 fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
4047 let mut paths = Vec::new();
4048 for entry in self.entries_by_path.cursor::<()>() {
4049 if include_ignored || !entry.is_ignored {
4050 paths.push((entry.path.as_ref(), entry.inode, entry.is_ignored));
4051 }
4052 }
4053 paths.sort_by(|a, b| a.0.cmp(&b.0));
4054 paths
4055 }
4056 }
4057}