1use std::{
2 io::{BufRead, BufReader},
3 path::{Path, PathBuf},
4 pin::pin,
5 sync::{Arc, atomic::AtomicUsize},
6};
7
8use anyhow::{Context as _, Result, anyhow};
9use collections::{HashMap, HashSet};
10use fs::{Fs, copy_recursive};
11use futures::{
12 FutureExt, SinkExt,
13 future::{BoxFuture, Shared},
14};
15use gpui::{
16 App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
17};
18use postage::oneshot;
19use rpc::{
20 AnyProtoClient, ErrorExt, TypedEnvelope,
21 proto::{self, REMOTE_SERVER_PROJECT_ID},
22};
23use smol::{
24 channel::{Receiver, Sender},
25 stream::StreamExt,
26};
27use text::ReplicaId;
28use util::{
29 ResultExt,
30 paths::{PathStyle, RemotePathBuf, SanitizedPath},
31 rel_path::RelPath,
32};
33use worktree::{
34 CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
35 WorktreeId, WorktreeSettings,
36};
37
38use crate::{ProjectPath, search::SearchQuery};
39
40struct MatchingEntry {
41 worktree_root: Arc<Path>,
42 path: ProjectPath,
43 respond: oneshot::Sender<ProjectPath>,
44}
45
46enum WorktreeStoreState {
47 Local {
48 fs: Arc<dyn Fs>,
49 },
50 Remote {
51 upstream_client: AnyProtoClient,
52 upstream_project_id: u64,
53 path_style: PathStyle,
54 },
55}
56
57pub struct WorktreeStore {
58 next_entry_id: Arc<AtomicUsize>,
59 downstream_client: Option<(AnyProtoClient, u64)>,
60 retain_worktrees: bool,
61 worktrees: Vec<WorktreeHandle>,
62 worktrees_reordered: bool,
63 #[allow(clippy::type_complexity)]
64 loading_worktrees:
65 HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
66 state: WorktreeStoreState,
67}
68
69#[derive(Debug)]
70pub enum WorktreeStoreEvent {
71 WorktreeAdded(Entity<Worktree>),
72 WorktreeRemoved(EntityId, WorktreeId),
73 WorktreeReleased(EntityId, WorktreeId),
74 WorktreeOrderChanged,
75 WorktreeUpdateSent(Entity<Worktree>),
76 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
77 WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
78 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
79}
80
81impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
82
83impl WorktreeStore {
84 pub fn init(client: &AnyProtoClient) {
85 client.add_entity_request_handler(Self::handle_create_project_entry);
86 client.add_entity_request_handler(Self::handle_copy_project_entry);
87 client.add_entity_request_handler(Self::handle_delete_project_entry);
88 client.add_entity_request_handler(Self::handle_expand_project_entry);
89 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
90 }
91
92 pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
93 Self {
94 next_entry_id: Default::default(),
95 loading_worktrees: Default::default(),
96 downstream_client: None,
97 worktrees: Vec::new(),
98 worktrees_reordered: false,
99 retain_worktrees,
100 state: WorktreeStoreState::Local { fs },
101 }
102 }
103
104 pub fn remote(
105 retain_worktrees: bool,
106 upstream_client: AnyProtoClient,
107 upstream_project_id: u64,
108 path_style: PathStyle,
109 ) -> Self {
110 Self {
111 next_entry_id: Default::default(),
112 loading_worktrees: Default::default(),
113 downstream_client: None,
114 worktrees: Vec::new(),
115 worktrees_reordered: false,
116 retain_worktrees,
117 state: WorktreeStoreState::Remote {
118 upstream_client,
119 upstream_project_id,
120 path_style,
121 },
122 }
123 }
124
125 /// Iterates through all worktrees, including ones that don't appear in the project panel
126 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
127 self.worktrees
128 .iter()
129 .filter_map(move |worktree| worktree.upgrade())
130 }
131
132 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
133 pub fn visible_worktrees<'a>(
134 &'a self,
135 cx: &'a App,
136 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
137 self.worktrees()
138 .filter(|worktree| worktree.read(cx).is_visible())
139 }
140
141 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
142 self.worktrees()
143 .find(|worktree| worktree.read(cx).id() == id)
144 }
145
146 pub fn worktree_for_entry(
147 &self,
148 entry_id: ProjectEntryId,
149 cx: &App,
150 ) -> Option<Entity<Worktree>> {
151 self.worktrees()
152 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
153 }
154
155 pub fn find_worktree(
156 &self,
157 abs_path: impl AsRef<Path>,
158 cx: &App,
159 ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
160 let abs_path = SanitizedPath::new(abs_path.as_ref());
161 for tree in self.worktrees() {
162 let path_style = tree.read(cx).path_style();
163 if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
164 && let Ok(relative_path) = RelPath::new(relative_path, path_style)
165 {
166 return Some((tree.clone(), relative_path.into_arc()));
167 }
168 }
169 None
170 }
171
172 pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
173 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
174 Some(worktree.read(cx).absolutize(&project_path.path))
175 }
176
177 pub fn path_style(&self) -> PathStyle {
178 match &self.state {
179 WorktreeStoreState::Local { .. } => PathStyle::local(),
180 WorktreeStoreState::Remote { path_style, .. } => *path_style,
181 }
182 }
183
184 pub fn find_or_create_worktree(
185 &mut self,
186 abs_path: impl AsRef<Path>,
187 visible: bool,
188 cx: &mut Context<Self>,
189 ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
190 let abs_path = abs_path.as_ref();
191 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
192 Task::ready(Ok((tree, relative_path)))
193 } else {
194 let worktree = self.create_worktree(abs_path, visible, cx);
195 cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
196 }
197 }
198
199 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
200 self.worktrees()
201 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
202 }
203
204 pub fn worktree_and_entry_for_id<'a>(
205 &'a self,
206 entry_id: ProjectEntryId,
207 cx: &'a App,
208 ) -> Option<(Entity<Worktree>, &'a Entry)> {
209 self.worktrees().find_map(|worktree| {
210 worktree
211 .read(cx)
212 .entry_for_id(entry_id)
213 .map(|e| (worktree.clone(), e))
214 })
215 }
216
217 pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
218 self.worktree_for_id(path.worktree_id, cx)?
219 .read(cx)
220 .entry_for_path(&path.path)
221 }
222
223 pub fn copy_entry(
224 &mut self,
225 entry_id: ProjectEntryId,
226 new_project_path: ProjectPath,
227 cx: &mut Context<Self>,
228 ) -> Task<Result<Option<Entry>>> {
229 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
230 return Task::ready(Err(anyhow!("no such worktree")));
231 };
232 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
233 return Task::ready(Err(anyhow!("no such entry")));
234 };
235 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
236 return Task::ready(Err(anyhow!("no such worktree")));
237 };
238
239 match &self.state {
240 WorktreeStoreState::Local { fs } => {
241 let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
242 let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
243 let fs = fs.clone();
244 let copy = cx.background_spawn(async move {
245 copy_recursive(
246 fs.as_ref(),
247 &old_abs_path,
248 &new_abs_path,
249 Default::default(),
250 )
251 .await
252 });
253
254 cx.spawn(async move |_, cx| {
255 copy.await?;
256 new_worktree
257 .update(cx, |this, cx| {
258 this.as_local_mut().unwrap().refresh_entry(
259 new_project_path.path,
260 None,
261 cx,
262 )
263 })?
264 .await
265 })
266 }
267 WorktreeStoreState::Remote {
268 upstream_client,
269 upstream_project_id,
270 ..
271 } => {
272 let response = upstream_client.request(proto::CopyProjectEntry {
273 project_id: *upstream_project_id,
274 entry_id: entry_id.to_proto(),
275 new_path: new_project_path.path.to_proto(),
276 new_worktree_id: new_project_path.worktree_id.to_proto(),
277 });
278 cx.spawn(async move |_, cx| {
279 let response = response.await?;
280 match response.entry {
281 Some(entry) => new_worktree
282 .update(cx, |worktree, cx| {
283 worktree.as_remote_mut().unwrap().insert_entry(
284 entry,
285 response.worktree_scan_id as usize,
286 cx,
287 )
288 })?
289 .await
290 .map(Some),
291 None => Ok(None),
292 }
293 })
294 }
295 }
296 }
297
298 pub fn rename_entry(
299 &mut self,
300 entry_id: ProjectEntryId,
301 new_project_path: ProjectPath,
302 cx: &mut Context<Self>,
303 ) -> Task<Result<CreatedEntry>> {
304 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
305 return Task::ready(Err(anyhow!("no such worktree")));
306 };
307 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
308 return Task::ready(Err(anyhow!("no such entry")));
309 };
310 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
311 return Task::ready(Err(anyhow!("no such worktree")));
312 };
313
314 match &self.state {
315 WorktreeStoreState::Local { fs } => {
316 let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
317 let new_worktree_ref = new_worktree.read(cx);
318 let is_root_entry = new_worktree_ref
319 .root_entry()
320 .is_some_and(|e| e.id == entry_id);
321 let abs_new_path = if is_root_entry {
322 let abs_path = new_worktree_ref.abs_path();
323 let Some(root_parent_path) = abs_path.parent() else {
324 return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
325 };
326 root_parent_path.join(new_project_path.path.as_std_path())
327 } else {
328 new_worktree_ref.absolutize(&new_project_path.path)
329 };
330
331 let fs = fs.clone();
332 let case_sensitive = new_worktree
333 .read(cx)
334 .as_local()
335 .unwrap()
336 .fs_is_case_sensitive();
337
338 let do_rename =
339 async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
340 fs.rename(
341 &old_path,
342 &new_path,
343 fs::RenameOptions {
344 overwrite,
345 ..fs::RenameOptions::default()
346 },
347 )
348 .await
349 .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
350 };
351
352 let rename = cx.background_spawn({
353 let abs_new_path = abs_new_path.clone();
354 async move {
355 // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
356 // we want to overwrite, because otherwise we run into a file-already-exists error.
357 let overwrite = !case_sensitive
358 && abs_old_path != abs_new_path
359 && abs_old_path.to_str().map(|p| p.to_lowercase())
360 == abs_new_path.to_str().map(|p| p.to_lowercase());
361
362 // The directory we're renaming into might not exist yet
363 if let Err(e) =
364 do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
365 {
366 if let Some(err) = e.downcast_ref::<std::io::Error>()
367 && err.kind() == std::io::ErrorKind::NotFound
368 {
369 if let Some(parent) = abs_new_path.parent() {
370 fs.create_dir(parent).await.with_context(|| {
371 format!("creating parent directory {parent:?}")
372 })?;
373 return do_rename(
374 fs.as_ref(),
375 &abs_old_path,
376 &abs_new_path,
377 overwrite,
378 )
379 .await;
380 }
381 }
382 return Err(e);
383 }
384 Ok(())
385 }
386 });
387
388 cx.spawn(async move |_, cx| {
389 rename.await?;
390 Ok(new_worktree
391 .update(cx, |this, cx| {
392 let local = this.as_local_mut().unwrap();
393 if is_root_entry {
394 // We eagerly update `abs_path` and refresh this worktree.
395 // Otherwise, the FS watcher would do it on the `RootUpdated` event,
396 // but with a noticeable delay, so we handle it proactively.
397 local.update_abs_path_and_refresh(
398 Some(SanitizedPath::new_arc(&abs_new_path)),
399 cx,
400 );
401 Task::ready(Ok(this.root_entry().cloned()))
402 } else {
403 // First refresh the parent directory (in case it was newly created)
404 if let Some(parent) = new_project_path.path.parent() {
405 let _ = local.refresh_entries_for_paths(vec![parent.into()]);
406 }
407 // Then refresh the new path
408 local.refresh_entry(
409 new_project_path.path.clone(),
410 Some(old_entry.path),
411 cx,
412 )
413 }
414 })?
415 .await?
416 .map(CreatedEntry::Included)
417 .unwrap_or_else(|| CreatedEntry::Excluded {
418 abs_path: abs_new_path,
419 }))
420 })
421 }
422 WorktreeStoreState::Remote {
423 upstream_client,
424 upstream_project_id,
425 ..
426 } => {
427 let response = upstream_client.request(proto::RenameProjectEntry {
428 project_id: *upstream_project_id,
429 entry_id: entry_id.to_proto(),
430 new_path: new_project_path.path.to_proto(),
431 new_worktree_id: new_project_path.worktree_id.to_proto(),
432 });
433 cx.spawn(async move |_, cx| {
434 let response = response.await?;
435 match response.entry {
436 Some(entry) => new_worktree
437 .update(cx, |worktree, cx| {
438 worktree.as_remote_mut().unwrap().insert_entry(
439 entry,
440 response.worktree_scan_id as usize,
441 cx,
442 )
443 })?
444 .await
445 .map(CreatedEntry::Included),
446 None => {
447 let abs_path = new_worktree.read_with(cx, |worktree, _| {
448 worktree.absolutize(&new_project_path.path)
449 })?;
450 Ok(CreatedEntry::Excluded { abs_path })
451 }
452 }
453 })
454 }
455 }
456 }
457 pub fn create_worktree(
458 &mut self,
459 abs_path: impl AsRef<Path>,
460 visible: bool,
461 cx: &mut Context<Self>,
462 ) -> Task<Result<Entity<Worktree>>> {
463 let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
464 if !self.loading_worktrees.contains_key(&abs_path) {
465 let task = match &self.state {
466 WorktreeStoreState::Remote {
467 upstream_client,
468 path_style,
469 ..
470 } => {
471 if upstream_client.is_via_collab() {
472 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
473 } else {
474 let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
475 self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
476 }
477 }
478 WorktreeStoreState::Local { fs } => {
479 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
480 }
481 };
482
483 self.loading_worktrees
484 .insert(abs_path.clone(), task.shared());
485 }
486 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
487 cx.spawn(async move |this, cx| {
488 let result = task.await;
489 this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
490 .ok();
491 match result {
492 Ok(worktree) => Ok(worktree),
493 Err(err) => Err((*err).cloned()),
494 }
495 })
496 }
497
498 fn create_remote_worktree(
499 &mut self,
500 client: AnyProtoClient,
501 abs_path: RemotePathBuf,
502 visible: bool,
503 cx: &mut Context<Self>,
504 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
505 let path_style = abs_path.path_style();
506 let mut abs_path = abs_path.to_string();
507 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
508 // in which case want to strip the leading the `/`.
509 // On the host-side, the `~` will get expanded.
510 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
511 if abs_path.starts_with("/~") {
512 abs_path = abs_path[1..].to_string();
513 }
514 if abs_path.is_empty() {
515 abs_path = "~/".to_string();
516 }
517
518 cx.spawn(async move |this, cx| {
519 let this = this.upgrade().context("Dropped worktree store")?;
520
521 let path = RemotePathBuf::new(abs_path, path_style);
522 let response = client
523 .request(proto::AddWorktree {
524 project_id: REMOTE_SERVER_PROJECT_ID,
525 path: path.to_proto(),
526 visible,
527 })
528 .await?;
529
530 if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
531 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
532 })? {
533 return Ok(existing_worktree);
534 }
535
536 let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
537 let root_name = root_path_buf
538 .file_name()
539 .map(|n| n.to_string_lossy().into_owned())
540 .unwrap_or(root_path_buf.to_string_lossy().into_owned());
541
542 let worktree = cx.update(|cx| {
543 Worktree::remote(
544 REMOTE_SERVER_PROJECT_ID,
545 0,
546 proto::WorktreeMetadata {
547 id: response.worktree_id,
548 root_name,
549 visible,
550 abs_path: response.canonicalized_path,
551 },
552 client,
553 path_style,
554 cx,
555 )
556 })?;
557
558 this.update(cx, |this, cx| {
559 this.add(&worktree, cx);
560 })?;
561 Ok(worktree)
562 })
563 }
564
565 fn create_local_worktree(
566 &mut self,
567 fs: Arc<dyn Fs>,
568 abs_path: Arc<SanitizedPath>,
569 visible: bool,
570 cx: &mut Context<Self>,
571 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
572 let next_entry_id = self.next_entry_id.clone();
573
574 cx.spawn(async move |this, cx| {
575 let worktree = Worktree::local(
576 SanitizedPath::cast_arc(abs_path.clone()),
577 visible,
578 fs,
579 next_entry_id,
580 cx,
581 )
582 .await;
583
584 let worktree = worktree?;
585
586 this.update(cx, |this, cx| this.add(&worktree, cx))?;
587
588 if visible {
589 cx.update(|cx| {
590 cx.add_recent_document(abs_path.as_path());
591 })
592 .log_err();
593 }
594
595 Ok(worktree)
596 })
597 }
598
599 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
600 let worktree_id = worktree.read(cx).id();
601 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
602
603 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
604 let handle = if push_strong_handle {
605 WorktreeHandle::Strong(worktree.clone())
606 } else {
607 WorktreeHandle::Weak(worktree.downgrade())
608 };
609 if self.worktrees_reordered {
610 self.worktrees.push(handle);
611 } else {
612 let i = match self
613 .worktrees
614 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
615 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
616 }) {
617 Ok(i) | Err(i) => i,
618 };
619 self.worktrees.insert(i, handle);
620 }
621
622 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
623 self.send_project_updates(cx);
624
625 let handle_id = worktree.entity_id();
626 cx.subscribe(worktree, |_, worktree, event, cx| {
627 let worktree_id = worktree.read(cx).id();
628 match event {
629 worktree::Event::UpdatedEntries(changes) => {
630 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
631 worktree_id,
632 changes.clone(),
633 ));
634 }
635 worktree::Event::UpdatedGitRepositories(set) => {
636 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
637 worktree_id,
638 set.clone(),
639 ));
640 }
641 worktree::Event::DeletedEntry(id) => {
642 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
643 }
644 }
645 })
646 .detach();
647 cx.observe_release(worktree, move |this, worktree, cx| {
648 cx.emit(WorktreeStoreEvent::WorktreeReleased(
649 handle_id,
650 worktree.id(),
651 ));
652 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
653 handle_id,
654 worktree.id(),
655 ));
656 this.send_project_updates(cx);
657 })
658 .detach();
659 }
660
661 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
662 self.worktrees.retain(|worktree| {
663 if let Some(worktree) = worktree.upgrade() {
664 if worktree.read(cx).id() == id_to_remove {
665 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
666 worktree.entity_id(),
667 id_to_remove,
668 ));
669 false
670 } else {
671 true
672 }
673 } else {
674 false
675 }
676 });
677 self.send_project_updates(cx);
678 }
679
680 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
681 self.worktrees_reordered = worktrees_reordered;
682 }
683
684 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
685 match &self.state {
686 WorktreeStoreState::Remote {
687 upstream_client,
688 upstream_project_id,
689 ..
690 } => Some((upstream_client.clone(), *upstream_project_id)),
691 WorktreeStoreState::Local { .. } => None,
692 }
693 }
694
695 pub fn set_worktrees_from_proto(
696 &mut self,
697 worktrees: Vec<proto::WorktreeMetadata>,
698 replica_id: ReplicaId,
699 cx: &mut Context<Self>,
700 ) -> Result<()> {
701 let mut old_worktrees_by_id = self
702 .worktrees
703 .drain(..)
704 .filter_map(|worktree| {
705 let worktree = worktree.upgrade()?;
706 Some((worktree.read(cx).id(), worktree))
707 })
708 .collect::<HashMap<_, _>>();
709
710 let (client, project_id) = self.upstream_client().context("invalid project")?;
711
712 for worktree in worktrees {
713 if let Some(old_worktree) =
714 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
715 {
716 let push_strong_handle =
717 self.retain_worktrees || old_worktree.read(cx).is_visible();
718 let handle = if push_strong_handle {
719 WorktreeHandle::Strong(old_worktree.clone())
720 } else {
721 WorktreeHandle::Weak(old_worktree.downgrade())
722 };
723 self.worktrees.push(handle);
724 } else {
725 self.add(
726 &Worktree::remote(
727 project_id,
728 replica_id,
729 worktree,
730 client.clone(),
731 self.path_style(),
732 cx,
733 ),
734 cx,
735 );
736 }
737 }
738 self.send_project_updates(cx);
739
740 Ok(())
741 }
742
743 pub fn move_worktree(
744 &mut self,
745 source: WorktreeId,
746 destination: WorktreeId,
747 cx: &mut Context<Self>,
748 ) -> Result<()> {
749 if source == destination {
750 return Ok(());
751 }
752
753 let mut source_index = None;
754 let mut destination_index = None;
755 for (i, worktree) in self.worktrees.iter().enumerate() {
756 if let Some(worktree) = worktree.upgrade() {
757 let worktree_id = worktree.read(cx).id();
758 if worktree_id == source {
759 source_index = Some(i);
760 if destination_index.is_some() {
761 break;
762 }
763 } else if worktree_id == destination {
764 destination_index = Some(i);
765 if source_index.is_some() {
766 break;
767 }
768 }
769 }
770 }
771
772 let source_index =
773 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
774 let destination_index =
775 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
776
777 if source_index == destination_index {
778 return Ok(());
779 }
780
781 let worktree_to_move = self.worktrees.remove(source_index);
782 self.worktrees.insert(destination_index, worktree_to_move);
783 self.worktrees_reordered = true;
784 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
785 cx.notify();
786 Ok(())
787 }
788
789 pub fn disconnected_from_host(&mut self, cx: &mut App) {
790 for worktree in &self.worktrees {
791 if let Some(worktree) = worktree.upgrade() {
792 worktree.update(cx, |worktree, _| {
793 if let Some(worktree) = worktree.as_remote_mut() {
794 worktree.disconnected_from_host();
795 }
796 });
797 }
798 }
799 }
800
801 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
802 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
803 return;
804 };
805
806 let update = proto::UpdateProject {
807 project_id,
808 worktrees: self.worktree_metadata_protos(cx),
809 };
810
811 // collab has bad concurrency guarantees, so we send requests in serial.
812 let update_project = if downstream_client.is_via_collab() {
813 Some(downstream_client.request(update))
814 } else {
815 downstream_client.send(update).log_err();
816 None
817 };
818 cx.spawn(async move |this, cx| {
819 if let Some(update_project) = update_project {
820 update_project.await?;
821 }
822
823 this.update(cx, |this, cx| {
824 let worktrees = this.worktrees().collect::<Vec<_>>();
825
826 for worktree in worktrees {
827 worktree.update(cx, |worktree, cx| {
828 let client = downstream_client.clone();
829 worktree.observe_updates(project_id, cx, {
830 move |update| {
831 let client = client.clone();
832 async move {
833 if client.is_via_collab() {
834 client
835 .request(update)
836 .map(|result| result.log_err().is_some())
837 .await
838 } else {
839 client.send(update).log_err().is_some()
840 }
841 }
842 }
843 });
844 });
845
846 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
847 }
848
849 anyhow::Ok(())
850 })
851 })
852 .detach_and_log_err(cx);
853 }
854
855 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
856 self.worktrees()
857 .map(|worktree| {
858 let worktree = worktree.read(cx);
859 proto::WorktreeMetadata {
860 id: worktree.id().to_proto(),
861 root_name: worktree.root_name_str().to_owned(),
862 visible: worktree.is_visible(),
863 abs_path: worktree.abs_path().to_string_lossy().into_owned(),
864 }
865 })
866 .collect()
867 }
868
869 pub fn shared(
870 &mut self,
871 remote_id: u64,
872 downstream_client: AnyProtoClient,
873 cx: &mut Context<Self>,
874 ) {
875 self.retain_worktrees = true;
876 self.downstream_client = Some((downstream_client, remote_id));
877
878 // When shared, retain all worktrees
879 for worktree_handle in self.worktrees.iter_mut() {
880 match worktree_handle {
881 WorktreeHandle::Strong(_) => {}
882 WorktreeHandle::Weak(worktree) => {
883 if let Some(worktree) = worktree.upgrade() {
884 *worktree_handle = WorktreeHandle::Strong(worktree);
885 }
886 }
887 }
888 }
889 self.send_project_updates(cx);
890 }
891
892 pub fn unshared(&mut self, cx: &mut Context<Self>) {
893 self.retain_worktrees = false;
894 self.downstream_client.take();
895
896 // When not shared, only retain the visible worktrees
897 for worktree_handle in self.worktrees.iter_mut() {
898 if let WorktreeHandle::Strong(worktree) = worktree_handle {
899 let is_visible = worktree.update(cx, |worktree, _| {
900 worktree.stop_observing_updates();
901 worktree.is_visible()
902 });
903 if !is_visible {
904 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
905 }
906 }
907 }
908 }
909
910 /// search over all worktrees and return buffers that *might* match the search.
911 pub fn find_search_candidates(
912 &self,
913 query: SearchQuery,
914 limit: usize,
915 open_entries: HashSet<ProjectEntryId>,
916 fs: Arc<dyn Fs>,
917 cx: &Context<Self>,
918 ) -> Receiver<ProjectPath> {
919 let snapshots = self
920 .visible_worktrees(cx)
921 .filter_map(|tree| {
922 let tree = tree.read(cx);
923 Some((tree.snapshot(), tree.as_local()?.settings()))
924 })
925 .collect::<Vec<_>>();
926
927 let executor = cx.background_executor().clone();
928
929 // We want to return entries in the order they are in the worktrees, so we have one
930 // thread that iterates over the worktrees (and ignored directories) as necessary,
931 // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
932 // channel.
933 // We spawn a number of workers that take items from the filter channel and check the query
934 // against the version of the file on disk.
935 let (filter_tx, filter_rx) = smol::channel::bounded(64);
936 let (output_tx, output_rx) = smol::channel::bounded(64);
937 let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
938
939 let input = cx.background_spawn({
940 let fs = fs.clone();
941 let query = query.clone();
942 async move {
943 Self::find_candidate_paths(
944 fs,
945 snapshots,
946 open_entries,
947 query,
948 filter_tx,
949 output_tx,
950 )
951 .await
952 .log_err();
953 }
954 });
955 const MAX_CONCURRENT_FILE_SCANS: usize = 64;
956 let filters = cx.background_spawn(async move {
957 let fs = &fs;
958 let query = &query;
959 executor
960 .scoped(move |scope| {
961 for _ in 0..MAX_CONCURRENT_FILE_SCANS {
962 let filter_rx = filter_rx.clone();
963 scope.spawn(async move {
964 Self::filter_paths(fs, filter_rx, query)
965 .await
966 .log_with_level(log::Level::Debug);
967 })
968 }
969 })
970 .await;
971 });
972 cx.background_spawn(async move {
973 let mut matched = 0;
974 while let Ok(mut receiver) = output_rx.recv().await {
975 let Some(path) = receiver.next().await else {
976 continue;
977 };
978 let Ok(_) = matching_paths_tx.send(path).await else {
979 break;
980 };
981 matched += 1;
982 if matched == limit {
983 break;
984 }
985 }
986 drop(input);
987 drop(filters);
988 })
989 .detach();
990 matching_paths_rx
991 }
992
993 fn scan_ignored_dir<'a>(
994 fs: &'a Arc<dyn Fs>,
995 snapshot: &'a worktree::Snapshot,
996 path: &'a RelPath,
997 query: &'a SearchQuery,
998 filter_tx: &'a Sender<MatchingEntry>,
999 output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
1000 ) -> BoxFuture<'a, Result<()>> {
1001 async move {
1002 let abs_path = snapshot.absolutize(path);
1003 let Some(mut files) = fs
1004 .read_dir(&abs_path)
1005 .await
1006 .with_context(|| format!("listing ignored path {abs_path:?}"))
1007 .log_err()
1008 else {
1009 return Ok(());
1010 };
1011
1012 let mut results = Vec::new();
1013
1014 while let Some(Ok(file)) = files.next().await {
1015 let Some(metadata) = fs
1016 .metadata(&file)
1017 .await
1018 .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
1019 .log_err()
1020 .flatten()
1021 else {
1022 continue;
1023 };
1024 if metadata.is_symlink || metadata.is_fifo {
1025 continue;
1026 }
1027 let relative_path = file.strip_prefix(snapshot.abs_path())?;
1028 let relative_path = RelPath::new(&relative_path, snapshot.path_style())
1029 .context("getting relative path")?;
1030 results.push((relative_path.into_arc(), !metadata.is_dir))
1031 }
1032 results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
1033 for (path, is_file) in results {
1034 if is_file {
1035 if query.filters_path() {
1036 let matched_path = if query.match_full_paths() {
1037 let mut full_path = snapshot.root_name().as_std_path().to_owned();
1038 full_path.push(path.as_std_path());
1039 query.match_path(&full_path)
1040 } else {
1041 query.match_path(&path.as_std_path())
1042 };
1043 if !matched_path {
1044 continue;
1045 }
1046 }
1047 let (tx, rx) = oneshot::channel();
1048 output_tx.send(rx).await?;
1049 filter_tx
1050 .send(MatchingEntry {
1051 respond: tx,
1052 worktree_root: snapshot.abs_path().clone(),
1053 path: ProjectPath {
1054 worktree_id: snapshot.id(),
1055 path: path.into_arc(),
1056 },
1057 })
1058 .await?;
1059 } else {
1060 Self::scan_ignored_dir(fs, snapshot, &path, query, filter_tx, output_tx)
1061 .await?;
1062 }
1063 }
1064 Ok(())
1065 }
1066 .boxed()
1067 }
1068
1069 async fn find_candidate_paths(
1070 fs: Arc<dyn Fs>,
1071 snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
1072 open_entries: HashSet<ProjectEntryId>,
1073 query: SearchQuery,
1074 filter_tx: Sender<MatchingEntry>,
1075 output_tx: Sender<oneshot::Receiver<ProjectPath>>,
1076 ) -> Result<()> {
1077 for (snapshot, settings) in snapshots {
1078 for entry in snapshot.entries(query.include_ignored(), 0) {
1079 if entry.is_dir() && entry.is_ignored {
1080 if !settings.is_path_excluded(&entry.path) {
1081 Self::scan_ignored_dir(
1082 &fs,
1083 &snapshot,
1084 &entry.path,
1085 &query,
1086 &filter_tx,
1087 &output_tx,
1088 )
1089 .await?;
1090 }
1091 continue;
1092 }
1093
1094 if entry.is_fifo || !entry.is_file() {
1095 continue;
1096 }
1097
1098 if query.filters_path() {
1099 let matched_path = if query.match_full_paths() {
1100 let mut full_path = snapshot.root_name().as_std_path().to_owned();
1101 full_path.push(entry.path.as_std_path());
1102 query.match_path(&full_path)
1103 } else {
1104 query.match_path(entry.path.as_std_path())
1105 };
1106 if !matched_path {
1107 continue;
1108 }
1109 }
1110
1111 let (mut tx, rx) = oneshot::channel();
1112
1113 if open_entries.contains(&entry.id) {
1114 tx.send(ProjectPath {
1115 worktree_id: snapshot.id(),
1116 path: entry.path.clone(),
1117 })
1118 .await?;
1119 } else {
1120 filter_tx
1121 .send(MatchingEntry {
1122 respond: tx,
1123 worktree_root: snapshot.abs_path().clone(),
1124 path: ProjectPath {
1125 worktree_id: snapshot.id(),
1126 path: entry.path.clone(),
1127 },
1128 })
1129 .await?;
1130 }
1131
1132 output_tx.send(rx).await?;
1133 }
1134 }
1135 Ok(())
1136 }
1137
1138 async fn filter_paths(
1139 fs: &Arc<dyn Fs>,
1140 input: Receiver<MatchingEntry>,
1141 query: &SearchQuery,
1142 ) -> Result<()> {
1143 let mut input = pin!(input);
1144 while let Some(mut entry) = input.next().await {
1145 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1146 let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1147 continue;
1148 };
1149
1150 let mut file = BufReader::new(file);
1151 let file_start = file.fill_buf()?;
1152
1153 if let Err(Some(starting_position)) =
1154 std::str::from_utf8(file_start).map_err(|e| e.error_len())
1155 {
1156 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1157 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1158 log::debug!(
1159 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1160 );
1161 continue;
1162 }
1163
1164 if query.detect(file).unwrap_or(false) {
1165 entry.respond.send(entry.path).await?
1166 }
1167 }
1168
1169 Ok(())
1170 }
1171
1172 pub async fn handle_create_project_entry(
1173 this: Entity<Self>,
1174 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1175 mut cx: AsyncApp,
1176 ) -> Result<proto::ProjectEntryResponse> {
1177 let worktree = this.update(&mut cx, |this, cx| {
1178 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1179 this.worktree_for_id(worktree_id, cx)
1180 .context("worktree not found")
1181 })??;
1182 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1183 }
1184
1185 pub async fn handle_copy_project_entry(
1186 this: Entity<Self>,
1187 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1188 mut cx: AsyncApp,
1189 ) -> Result<proto::ProjectEntryResponse> {
1190 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1191 let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1192 let new_project_path = (
1193 new_worktree_id,
1194 RelPath::from_proto(&envelope.payload.new_path)?,
1195 );
1196 let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1197 let new_worktree = this
1198 .worktree_for_id(new_worktree_id, cx)
1199 .context("no such worktree")?;
1200 let scan_id = new_worktree.read(cx).scan_id();
1201 anyhow::Ok((
1202 scan_id,
1203 this.copy_entry(entry_id, new_project_path.into(), cx),
1204 ))
1205 })??;
1206 let entry = entry.await?;
1207 Ok(proto::ProjectEntryResponse {
1208 entry: entry.as_ref().map(|entry| entry.into()),
1209 worktree_scan_id: scan_id as u64,
1210 })
1211 }
1212
1213 pub async fn handle_delete_project_entry(
1214 this: Entity<Self>,
1215 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1216 mut cx: AsyncApp,
1217 ) -> Result<proto::ProjectEntryResponse> {
1218 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1219 let worktree = this.update(&mut cx, |this, cx| {
1220 this.worktree_for_entry(entry_id, cx)
1221 .context("worktree not found")
1222 })??;
1223 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1224 }
1225
1226 pub async fn handle_rename_project_entry(
1227 this: Entity<Self>,
1228 request: proto::RenameProjectEntry,
1229 mut cx: AsyncApp,
1230 ) -> Result<proto::ProjectEntryResponse> {
1231 let entry_id = ProjectEntryId::from_proto(request.entry_id);
1232 let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1233 let rel_path = RelPath::from_proto(&request.new_path)
1234 .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1235
1236 let (scan_id, task) = this.update(&mut cx, |this, cx| {
1237 let worktree = this
1238 .worktree_for_entry(entry_id, cx)
1239 .context("no such worktree")?;
1240 let scan_id = worktree.read(cx).scan_id();
1241 anyhow::Ok((
1242 scan_id,
1243 this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1244 ))
1245 })??;
1246 Ok(proto::ProjectEntryResponse {
1247 entry: match &task.await? {
1248 CreatedEntry::Included(entry) => Some(entry.into()),
1249 CreatedEntry::Excluded { .. } => None,
1250 },
1251 worktree_scan_id: scan_id as u64,
1252 })
1253 }
1254
1255 pub async fn handle_expand_project_entry(
1256 this: Entity<Self>,
1257 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1258 mut cx: AsyncApp,
1259 ) -> Result<proto::ExpandProjectEntryResponse> {
1260 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1261 let worktree = this
1262 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1263 .context("invalid request")?;
1264 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1265 }
1266
1267 pub async fn handle_expand_all_for_project_entry(
1268 this: Entity<Self>,
1269 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1270 mut cx: AsyncApp,
1271 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1272 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1273 let worktree = this
1274 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1275 .context("invalid request")?;
1276 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1277 }
1278
1279 pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1280 match &self.state {
1281 WorktreeStoreState::Local { fs } => Some(fs.clone()),
1282 WorktreeStoreState::Remote { .. } => None,
1283 }
1284 }
1285}
1286
1287#[derive(Clone, Debug)]
1288enum WorktreeHandle {
1289 Strong(Entity<Worktree>),
1290 Weak(WeakEntity<Worktree>),
1291}
1292
1293impl WorktreeHandle {
1294 fn upgrade(&self) -> Option<Entity<Worktree>> {
1295 match self {
1296 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1297 WorktreeHandle::Weak(handle) => handle.upgrade(),
1298 }
1299 }
1300}