1use std::{
2 path::{Path, PathBuf},
3 pin::pin,
4 sync::{Arc, atomic::AtomicUsize},
5};
6
7use anyhow::{Context as _, Result, anyhow, bail};
8use collections::{HashMap, HashSet};
9use fs::{Fs, copy_recursive};
10use futures::{AsyncBufReadExt as _, FutureExt, SinkExt, future::Shared, io::BufReader};
11use gpui::{
12 App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
13};
14use postage::oneshot;
15use rpc::{
16 AnyProtoClient, ErrorExt, TypedEnvelope,
17 proto::{self, REMOTE_SERVER_PROJECT_ID},
18};
19use smol::{
20 channel::{Receiver, Sender},
21 stream::StreamExt,
22};
23use text::ReplicaId;
24use util::{
25 ResultExt,
26 paths::{PathStyle, RemotePathBuf, SanitizedPath},
27 rel_path::RelPath,
28};
29use worktree::{
30 CreatedEntry, Entry, ProjectEntryId, UpdatedEntriesSet, UpdatedGitRepositoriesSet, Worktree,
31 WorktreeId, WorktreeSettings,
32};
33
34use crate::{ProjectPath, search::SearchQuery};
35
36struct MatchingEntry {
37 worktree_root: Arc<Path>,
38 path: ProjectPath,
39 respond: oneshot::Sender<ProjectPath>,
40}
41
42enum WorktreeStoreState {
43 Local {
44 fs: Arc<dyn Fs>,
45 },
46 Remote {
47 upstream_client: AnyProtoClient,
48 upstream_project_id: u64,
49 path_style: PathStyle,
50 },
51}
52
53pub struct WorktreeStore {
54 next_entry_id: Arc<AtomicUsize>,
55 downstream_client: Option<(AnyProtoClient, u64)>,
56 retain_worktrees: bool,
57 worktrees: Vec<WorktreeHandle>,
58 worktrees_reordered: bool,
59 scanning_enabled: bool,
60 #[allow(clippy::type_complexity)]
61 loading_worktrees:
62 HashMap<Arc<SanitizedPath>, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
63 state: WorktreeStoreState,
64}
65
66#[derive(Debug)]
67pub enum WorktreeStoreEvent {
68 WorktreeAdded(Entity<Worktree>),
69 WorktreeRemoved(EntityId, WorktreeId),
70 WorktreeReleased(EntityId, WorktreeId),
71 WorktreeOrderChanged,
72 WorktreeUpdateSent(Entity<Worktree>),
73 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
74 WorktreeUpdatedGitRepositories(WorktreeId, UpdatedGitRepositoriesSet),
75 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
76}
77
78impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
79
80impl WorktreeStore {
81 pub fn init(client: &AnyProtoClient) {
82 client.add_entity_request_handler(Self::handle_create_project_entry);
83 client.add_entity_request_handler(Self::handle_copy_project_entry);
84 client.add_entity_request_handler(Self::handle_delete_project_entry);
85 client.add_entity_request_handler(Self::handle_expand_project_entry);
86 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
87 }
88
89 pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
90 Self {
91 next_entry_id: Default::default(),
92 loading_worktrees: Default::default(),
93 downstream_client: None,
94 worktrees: Vec::new(),
95 worktrees_reordered: false,
96 scanning_enabled: true,
97 retain_worktrees,
98 state: WorktreeStoreState::Local { fs },
99 }
100 }
101
102 pub fn remote(
103 retain_worktrees: bool,
104 upstream_client: AnyProtoClient,
105 upstream_project_id: u64,
106 path_style: PathStyle,
107 ) -> Self {
108 Self {
109 next_entry_id: Default::default(),
110 loading_worktrees: Default::default(),
111 downstream_client: None,
112 worktrees: Vec::new(),
113 worktrees_reordered: false,
114 scanning_enabled: true,
115 retain_worktrees,
116 state: WorktreeStoreState::Remote {
117 upstream_client,
118 upstream_project_id,
119 path_style,
120 },
121 }
122 }
123
124 pub fn disable_scanner(&mut self) {
125 self.scanning_enabled = false;
126 }
127
128 /// Iterates through all worktrees, including ones that don't appear in the project panel
129 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
130 self.worktrees
131 .iter()
132 .filter_map(move |worktree| worktree.upgrade())
133 }
134
135 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
136 pub fn visible_worktrees<'a>(
137 &'a self,
138 cx: &'a App,
139 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
140 self.worktrees()
141 .filter(|worktree| worktree.read(cx).is_visible())
142 }
143
144 /// Iterates through all user-visible worktrees (directories and files that appear in the project panel) and other, invisible single files that could appear e.g. due to drag and drop.
145 pub fn visible_worktrees_and_single_files<'a>(
146 &'a self,
147 cx: &'a App,
148 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
149 self.worktrees()
150 .filter(|worktree| worktree.read(cx).is_visible() || worktree.read(cx).is_single_file())
151 }
152
153 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
154 self.worktrees()
155 .find(|worktree| worktree.read(cx).id() == id)
156 }
157
158 pub fn worktree_for_entry(
159 &self,
160 entry_id: ProjectEntryId,
161 cx: &App,
162 ) -> Option<Entity<Worktree>> {
163 self.worktrees()
164 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
165 }
166
167 pub fn find_worktree(
168 &self,
169 abs_path: impl AsRef<Path>,
170 cx: &App,
171 ) -> Option<(Entity<Worktree>, Arc<RelPath>)> {
172 let abs_path = SanitizedPath::new(abs_path.as_ref());
173 for tree in self.worktrees() {
174 let path_style = tree.read(cx).path_style();
175 if let Ok(relative_path) = abs_path.as_ref().strip_prefix(tree.read(cx).abs_path())
176 && let Ok(relative_path) = RelPath::new(relative_path, path_style)
177 {
178 return Some((tree.clone(), relative_path.into_arc()));
179 }
180 }
181 None
182 }
183
184 pub fn absolutize(&self, project_path: &ProjectPath, cx: &App) -> Option<PathBuf> {
185 let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
186 Some(worktree.read(cx).absolutize(&project_path.path))
187 }
188
189 pub fn path_style(&self) -> PathStyle {
190 match &self.state {
191 WorktreeStoreState::Local { .. } => PathStyle::local(),
192 WorktreeStoreState::Remote { path_style, .. } => *path_style,
193 }
194 }
195
196 pub fn find_or_create_worktree(
197 &mut self,
198 abs_path: impl AsRef<Path>,
199 visible: bool,
200 cx: &mut Context<Self>,
201 ) -> Task<Result<(Entity<Worktree>, Arc<RelPath>)>> {
202 let abs_path = abs_path.as_ref();
203 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
204 Task::ready(Ok((tree, relative_path)))
205 } else {
206 let worktree = self.create_worktree(abs_path, visible, cx);
207 cx.background_spawn(async move { Ok((worktree.await?, RelPath::empty().into())) })
208 }
209 }
210
211 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
212 self.worktrees()
213 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
214 }
215
216 pub fn worktree_and_entry_for_id<'a>(
217 &'a self,
218 entry_id: ProjectEntryId,
219 cx: &'a App,
220 ) -> Option<(Entity<Worktree>, &'a Entry)> {
221 self.worktrees().find_map(|worktree| {
222 worktree
223 .read(cx)
224 .entry_for_id(entry_id)
225 .map(|e| (worktree.clone(), e))
226 })
227 }
228
229 pub fn entry_for_path<'a>(&'a self, path: &ProjectPath, cx: &'a App) -> Option<&'a Entry> {
230 self.worktree_for_id(path.worktree_id, cx)?
231 .read(cx)
232 .entry_for_path(&path.path)
233 }
234
235 pub fn copy_entry(
236 &mut self,
237 entry_id: ProjectEntryId,
238 new_project_path: ProjectPath,
239 cx: &mut Context<Self>,
240 ) -> Task<Result<Option<Entry>>> {
241 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
242 return Task::ready(Err(anyhow!("no such worktree")));
243 };
244 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id) else {
245 return Task::ready(Err(anyhow!("no such entry")));
246 };
247 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
248 return Task::ready(Err(anyhow!("no such worktree")));
249 };
250
251 match &self.state {
252 WorktreeStoreState::Local { fs } => {
253 let old_abs_path = old_worktree.read(cx).absolutize(&old_entry.path);
254 let new_abs_path = new_worktree.read(cx).absolutize(&new_project_path.path);
255 let fs = fs.clone();
256 let copy = cx.background_spawn(async move {
257 copy_recursive(
258 fs.as_ref(),
259 &old_abs_path,
260 &new_abs_path,
261 Default::default(),
262 )
263 .await
264 });
265
266 cx.spawn(async move |_, cx| {
267 copy.await?;
268 new_worktree
269 .update(cx, |this, cx| {
270 this.as_local_mut().unwrap().refresh_entry(
271 new_project_path.path,
272 None,
273 cx,
274 )
275 })?
276 .await
277 })
278 }
279 WorktreeStoreState::Remote {
280 upstream_client,
281 upstream_project_id,
282 ..
283 } => {
284 let response = upstream_client.request(proto::CopyProjectEntry {
285 project_id: *upstream_project_id,
286 entry_id: entry_id.to_proto(),
287 new_path: new_project_path.path.to_proto(),
288 new_worktree_id: new_project_path.worktree_id.to_proto(),
289 });
290 cx.spawn(async move |_, cx| {
291 let response = response.await?;
292 match response.entry {
293 Some(entry) => new_worktree
294 .update(cx, |worktree, cx| {
295 worktree.as_remote_mut().unwrap().insert_entry(
296 entry,
297 response.worktree_scan_id as usize,
298 cx,
299 )
300 })?
301 .await
302 .map(Some),
303 None => Ok(None),
304 }
305 })
306 }
307 }
308 }
309
310 pub fn rename_entry(
311 &mut self,
312 entry_id: ProjectEntryId,
313 new_project_path: ProjectPath,
314 cx: &mut Context<Self>,
315 ) -> Task<Result<CreatedEntry>> {
316 let Some(old_worktree) = self.worktree_for_entry(entry_id, cx) else {
317 return Task::ready(Err(anyhow!("no such worktree")));
318 };
319 let Some(old_entry) = old_worktree.read(cx).entry_for_id(entry_id).cloned() else {
320 return Task::ready(Err(anyhow!("no such entry")));
321 };
322 let Some(new_worktree) = self.worktree_for_id(new_project_path.worktree_id, cx) else {
323 return Task::ready(Err(anyhow!("no such worktree")));
324 };
325
326 match &self.state {
327 WorktreeStoreState::Local { fs } => {
328 let abs_old_path = old_worktree.read(cx).absolutize(&old_entry.path);
329 let new_worktree_ref = new_worktree.read(cx);
330 let is_root_entry = new_worktree_ref
331 .root_entry()
332 .is_some_and(|e| e.id == entry_id);
333 let abs_new_path = if is_root_entry {
334 let abs_path = new_worktree_ref.abs_path();
335 let Some(root_parent_path) = abs_path.parent() else {
336 return Task::ready(Err(anyhow!("no parent for path {:?}", abs_path)));
337 };
338 root_parent_path.join(new_project_path.path.as_std_path())
339 } else {
340 new_worktree_ref.absolutize(&new_project_path.path)
341 };
342
343 let fs = fs.clone();
344 let case_sensitive = new_worktree
345 .read(cx)
346 .as_local()
347 .unwrap()
348 .fs_is_case_sensitive();
349
350 let do_rename =
351 async move |fs: &dyn Fs, old_path: &Path, new_path: &Path, overwrite| {
352 fs.rename(
353 &old_path,
354 &new_path,
355 fs::RenameOptions {
356 overwrite,
357 ..fs::RenameOptions::default()
358 },
359 )
360 .await
361 .with_context(|| format!("renaming {old_path:?} into {new_path:?}"))
362 };
363
364 let rename = cx.background_spawn({
365 let abs_new_path = abs_new_path.clone();
366 async move {
367 // If we're on a case-insensitive FS and we're doing a case-only rename (i.e. `foobar` to `FOOBAR`)
368 // we want to overwrite, because otherwise we run into a file-already-exists error.
369 let overwrite = !case_sensitive
370 && abs_old_path != abs_new_path
371 && abs_old_path.to_str().map(|p| p.to_lowercase())
372 == abs_new_path.to_str().map(|p| p.to_lowercase());
373
374 // The directory we're renaming into might not exist yet
375 if let Err(e) =
376 do_rename(fs.as_ref(), &abs_old_path, &abs_new_path, overwrite).await
377 {
378 if let Some(err) = e.downcast_ref::<std::io::Error>()
379 && err.kind() == std::io::ErrorKind::NotFound
380 {
381 if let Some(parent) = abs_new_path.parent() {
382 fs.create_dir(parent).await.with_context(|| {
383 format!("creating parent directory {parent:?}")
384 })?;
385 return do_rename(
386 fs.as_ref(),
387 &abs_old_path,
388 &abs_new_path,
389 overwrite,
390 )
391 .await;
392 }
393 }
394 return Err(e);
395 }
396 Ok(())
397 }
398 });
399
400 cx.spawn(async move |_, cx| {
401 rename.await?;
402 Ok(new_worktree
403 .update(cx, |this, cx| {
404 let local = this.as_local_mut().unwrap();
405 if is_root_entry {
406 // We eagerly update `abs_path` and refresh this worktree.
407 // Otherwise, the FS watcher would do it on the `RootUpdated` event,
408 // but with a noticeable delay, so we handle it proactively.
409 local.update_abs_path_and_refresh(
410 SanitizedPath::new_arc(&abs_new_path),
411 cx,
412 );
413 Task::ready(Ok(this.root_entry().cloned()))
414 } else {
415 // First refresh the parent directory (in case it was newly created)
416 if let Some(parent) = new_project_path.path.parent() {
417 let _ = local.refresh_entries_for_paths(vec![parent.into()]);
418 }
419 // Then refresh the new path
420 local.refresh_entry(
421 new_project_path.path.clone(),
422 Some(old_entry.path),
423 cx,
424 )
425 }
426 })?
427 .await?
428 .map(CreatedEntry::Included)
429 .unwrap_or_else(|| CreatedEntry::Excluded {
430 abs_path: abs_new_path,
431 }))
432 })
433 }
434 WorktreeStoreState::Remote {
435 upstream_client,
436 upstream_project_id,
437 ..
438 } => {
439 let response = upstream_client.request(proto::RenameProjectEntry {
440 project_id: *upstream_project_id,
441 entry_id: entry_id.to_proto(),
442 new_path: new_project_path.path.to_proto(),
443 new_worktree_id: new_project_path.worktree_id.to_proto(),
444 });
445 cx.spawn(async move |_, cx| {
446 let response = response.await?;
447 match response.entry {
448 Some(entry) => new_worktree
449 .update(cx, |worktree, cx| {
450 worktree.as_remote_mut().unwrap().insert_entry(
451 entry,
452 response.worktree_scan_id as usize,
453 cx,
454 )
455 })?
456 .await
457 .map(CreatedEntry::Included),
458 None => {
459 let abs_path = new_worktree.read_with(cx, |worktree, _| {
460 worktree.absolutize(&new_project_path.path)
461 })?;
462 Ok(CreatedEntry::Excluded { abs_path })
463 }
464 }
465 })
466 }
467 }
468 }
469 pub fn create_worktree(
470 &mut self,
471 abs_path: impl AsRef<Path>,
472 visible: bool,
473 cx: &mut Context<Self>,
474 ) -> Task<Result<Entity<Worktree>>> {
475 let abs_path: Arc<SanitizedPath> = SanitizedPath::new_arc(&abs_path);
476 if !self.loading_worktrees.contains_key(&abs_path) {
477 let task = match &self.state {
478 WorktreeStoreState::Remote {
479 upstream_client,
480 path_style,
481 ..
482 } => {
483 if upstream_client.is_via_collab() {
484 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
485 } else {
486 let abs_path = RemotePathBuf::new(abs_path.to_string(), *path_style);
487 self.create_remote_worktree(upstream_client.clone(), abs_path, visible, cx)
488 }
489 }
490 WorktreeStoreState::Local { fs } => {
491 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
492 }
493 };
494
495 self.loading_worktrees
496 .insert(abs_path.clone(), task.shared());
497 }
498 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
499 cx.spawn(async move |this, cx| {
500 let result = task.await;
501 this.update(cx, |this, _| this.loading_worktrees.remove(&abs_path))
502 .ok();
503 match result {
504 Ok(worktree) => Ok(worktree),
505 Err(err) => Err((*err).cloned()),
506 }
507 })
508 }
509
510 fn create_remote_worktree(
511 &mut self,
512 client: AnyProtoClient,
513 abs_path: RemotePathBuf,
514 visible: bool,
515 cx: &mut Context<Self>,
516 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
517 let path_style = abs_path.path_style();
518 let mut abs_path = abs_path.to_string();
519 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
520 // in which case want to strip the leading the `/`.
521 // On the host-side, the `~` will get expanded.
522 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
523 if abs_path.starts_with("/~") {
524 abs_path = abs_path[1..].to_string();
525 }
526 if abs_path.is_empty() {
527 abs_path = "~/".to_string();
528 }
529
530 cx.spawn(async move |this, cx| {
531 let this = this.upgrade().context("Dropped worktree store")?;
532
533 let path = RemotePathBuf::new(abs_path, path_style);
534 let response = client
535 .request(proto::AddWorktree {
536 project_id: REMOTE_SERVER_PROJECT_ID,
537 path: path.to_proto(),
538 visible,
539 })
540 .await?;
541
542 if let Some(existing_worktree) = this.read_with(cx, |this, cx| {
543 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
544 })? {
545 return Ok(existing_worktree);
546 }
547
548 let root_path_buf = PathBuf::from(response.canonicalized_path.clone());
549 let root_name = root_path_buf
550 .file_name()
551 .map(|n| n.to_string_lossy().into_owned())
552 .unwrap_or(root_path_buf.to_string_lossy().into_owned());
553
554 let worktree = cx.update(|cx| {
555 Worktree::remote(
556 REMOTE_SERVER_PROJECT_ID,
557 ReplicaId::REMOTE_SERVER,
558 proto::WorktreeMetadata {
559 id: response.worktree_id,
560 root_name,
561 visible,
562 abs_path: response.canonicalized_path,
563 },
564 client,
565 path_style,
566 cx,
567 )
568 })?;
569
570 this.update(cx, |this, cx| {
571 this.add(&worktree, cx);
572 })?;
573 Ok(worktree)
574 })
575 }
576
577 fn create_local_worktree(
578 &mut self,
579 fs: Arc<dyn Fs>,
580 abs_path: Arc<SanitizedPath>,
581 visible: bool,
582 cx: &mut Context<Self>,
583 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
584 let next_entry_id = self.next_entry_id.clone();
585 let scanning_enabled = self.scanning_enabled;
586
587 cx.spawn(async move |this, cx| {
588 let worktree = Worktree::local(
589 SanitizedPath::cast_arc(abs_path.clone()),
590 visible,
591 fs,
592 next_entry_id,
593 scanning_enabled,
594 cx,
595 )
596 .await;
597
598 let worktree = worktree?;
599
600 this.update(cx, |this, cx| this.add(&worktree, cx))?;
601
602 if visible {
603 cx.update(|cx| {
604 cx.add_recent_document(abs_path.as_path());
605 })
606 .log_err();
607 }
608
609 Ok(worktree)
610 })
611 }
612
613 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
614 let worktree_id = worktree.read(cx).id();
615 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
616
617 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
618 let handle = if push_strong_handle {
619 WorktreeHandle::Strong(worktree.clone())
620 } else {
621 WorktreeHandle::Weak(worktree.downgrade())
622 };
623 if self.worktrees_reordered {
624 self.worktrees.push(handle);
625 } else {
626 let i = match self
627 .worktrees
628 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
629 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
630 }) {
631 Ok(i) | Err(i) => i,
632 };
633 self.worktrees.insert(i, handle);
634 }
635
636 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
637 self.send_project_updates(cx);
638
639 let handle_id = worktree.entity_id();
640 cx.subscribe(worktree, |_, worktree, event, cx| {
641 let worktree_id = worktree.read(cx).id();
642 match event {
643 worktree::Event::UpdatedEntries(changes) => {
644 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
645 worktree_id,
646 changes.clone(),
647 ));
648 }
649 worktree::Event::UpdatedGitRepositories(set) => {
650 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
651 worktree_id,
652 set.clone(),
653 ));
654 }
655 worktree::Event::DeletedEntry(id) => {
656 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
657 }
658 }
659 })
660 .detach();
661 cx.observe_release(worktree, move |this, worktree, cx| {
662 cx.emit(WorktreeStoreEvent::WorktreeReleased(
663 handle_id,
664 worktree.id(),
665 ));
666 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
667 handle_id,
668 worktree.id(),
669 ));
670 this.send_project_updates(cx);
671 })
672 .detach();
673 }
674
675 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
676 self.worktrees.retain(|worktree| {
677 if let Some(worktree) = worktree.upgrade() {
678 if worktree.read(cx).id() == id_to_remove {
679 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
680 worktree.entity_id(),
681 id_to_remove,
682 ));
683 false
684 } else {
685 true
686 }
687 } else {
688 false
689 }
690 });
691 self.send_project_updates(cx);
692 }
693
694 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
695 self.worktrees_reordered = worktrees_reordered;
696 }
697
698 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
699 match &self.state {
700 WorktreeStoreState::Remote {
701 upstream_client,
702 upstream_project_id,
703 ..
704 } => Some((upstream_client.clone(), *upstream_project_id)),
705 WorktreeStoreState::Local { .. } => None,
706 }
707 }
708
709 pub fn set_worktrees_from_proto(
710 &mut self,
711 worktrees: Vec<proto::WorktreeMetadata>,
712 replica_id: ReplicaId,
713 cx: &mut Context<Self>,
714 ) -> Result<()> {
715 let mut old_worktrees_by_id = self
716 .worktrees
717 .drain(..)
718 .filter_map(|worktree| {
719 let worktree = worktree.upgrade()?;
720 Some((worktree.read(cx).id(), worktree))
721 })
722 .collect::<HashMap<_, _>>();
723
724 let (client, project_id) = self.upstream_client().context("invalid project")?;
725
726 for worktree in worktrees {
727 if let Some(old_worktree) =
728 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
729 {
730 let push_strong_handle =
731 self.retain_worktrees || old_worktree.read(cx).is_visible();
732 let handle = if push_strong_handle {
733 WorktreeHandle::Strong(old_worktree.clone())
734 } else {
735 WorktreeHandle::Weak(old_worktree.downgrade())
736 };
737 self.worktrees.push(handle);
738 } else {
739 self.add(
740 &Worktree::remote(
741 project_id,
742 replica_id,
743 worktree,
744 client.clone(),
745 self.path_style(),
746 cx,
747 ),
748 cx,
749 );
750 }
751 }
752 self.send_project_updates(cx);
753
754 Ok(())
755 }
756
757 pub fn move_worktree(
758 &mut self,
759 source: WorktreeId,
760 destination: WorktreeId,
761 cx: &mut Context<Self>,
762 ) -> Result<()> {
763 if source == destination {
764 return Ok(());
765 }
766
767 let mut source_index = None;
768 let mut destination_index = None;
769 for (i, worktree) in self.worktrees.iter().enumerate() {
770 if let Some(worktree) = worktree.upgrade() {
771 let worktree_id = worktree.read(cx).id();
772 if worktree_id == source {
773 source_index = Some(i);
774 if destination_index.is_some() {
775 break;
776 }
777 } else if worktree_id == destination {
778 destination_index = Some(i);
779 if source_index.is_some() {
780 break;
781 }
782 }
783 }
784 }
785
786 let source_index =
787 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
788 let destination_index =
789 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
790
791 if source_index == destination_index {
792 return Ok(());
793 }
794
795 let worktree_to_move = self.worktrees.remove(source_index);
796 self.worktrees.insert(destination_index, worktree_to_move);
797 self.worktrees_reordered = true;
798 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
799 cx.notify();
800 Ok(())
801 }
802
803 pub fn disconnected_from_host(&mut self, cx: &mut App) {
804 for worktree in &self.worktrees {
805 if let Some(worktree) = worktree.upgrade() {
806 worktree.update(cx, |worktree, _| {
807 if let Some(worktree) = worktree.as_remote_mut() {
808 worktree.disconnected_from_host();
809 }
810 });
811 }
812 }
813 }
814
815 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
816 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
817 return;
818 };
819
820 let update = proto::UpdateProject {
821 project_id,
822 worktrees: self.worktree_metadata_protos(cx),
823 };
824
825 // collab has bad concurrency guarantees, so we send requests in serial.
826 let update_project = if downstream_client.is_via_collab() {
827 Some(downstream_client.request(update))
828 } else {
829 downstream_client.send(update).log_err();
830 None
831 };
832 cx.spawn(async move |this, cx| {
833 if let Some(update_project) = update_project {
834 update_project.await?;
835 }
836
837 this.update(cx, |this, cx| {
838 let worktrees = this.worktrees().collect::<Vec<_>>();
839
840 for worktree in worktrees {
841 worktree.update(cx, |worktree, cx| {
842 let client = downstream_client.clone();
843 worktree.observe_updates(project_id, cx, {
844 move |update| {
845 let client = client.clone();
846 async move {
847 if client.is_via_collab() {
848 client
849 .request(update)
850 .map(|result| result.log_err().is_some())
851 .await
852 } else {
853 client.send(update).log_err().is_some()
854 }
855 }
856 }
857 });
858 });
859
860 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
861 }
862
863 anyhow::Ok(())
864 })
865 })
866 .detach_and_log_err(cx);
867 }
868
869 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
870 self.worktrees()
871 .map(|worktree| {
872 let worktree = worktree.read(cx);
873 proto::WorktreeMetadata {
874 id: worktree.id().to_proto(),
875 root_name: worktree.root_name_str().to_owned(),
876 visible: worktree.is_visible(),
877 abs_path: worktree.abs_path().to_string_lossy().into_owned(),
878 }
879 })
880 .collect()
881 }
882
883 pub fn shared(
884 &mut self,
885 remote_id: u64,
886 downstream_client: AnyProtoClient,
887 cx: &mut Context<Self>,
888 ) {
889 self.retain_worktrees = true;
890 self.downstream_client = Some((downstream_client, remote_id));
891
892 // When shared, retain all worktrees
893 for worktree_handle in self.worktrees.iter_mut() {
894 match worktree_handle {
895 WorktreeHandle::Strong(_) => {}
896 WorktreeHandle::Weak(worktree) => {
897 if let Some(worktree) = worktree.upgrade() {
898 *worktree_handle = WorktreeHandle::Strong(worktree);
899 }
900 }
901 }
902 }
903 self.send_project_updates(cx);
904 }
905
906 pub fn unshared(&mut self, cx: &mut Context<Self>) {
907 self.retain_worktrees = false;
908 self.downstream_client.take();
909
910 // When not shared, only retain the visible worktrees
911 for worktree_handle in self.worktrees.iter_mut() {
912 if let WorktreeHandle::Strong(worktree) = worktree_handle {
913 let is_visible = worktree.update(cx, |worktree, _| {
914 worktree.stop_observing_updates();
915 worktree.is_visible()
916 });
917 if !is_visible {
918 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
919 }
920 }
921 }
922 }
923
924 /// search over all worktrees and return buffers that *might* match the search.
925 pub fn find_search_candidates(
926 &self,
927 query: SearchQuery,
928 limit: usize,
929 open_entries: HashSet<ProjectEntryId>,
930 fs: Arc<dyn Fs>,
931 cx: &Context<Self>,
932 ) -> Receiver<ProjectPath> {
933 let snapshots = self
934 .visible_worktrees(cx)
935 .filter_map(|tree| {
936 let tree = tree.read(cx);
937 Some((tree.snapshot(), tree.as_local()?.settings()))
938 })
939 .collect::<Vec<_>>();
940
941 let executor = cx.background_executor().clone();
942
943 // We want to return entries in the order they are in the worktrees, so we have one
944 // thread that iterates over the worktrees (and ignored directories) as necessary,
945 // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
946 // channel.
947 // We spawn a number of workers that take items from the filter channel and check the query
948 // against the version of the file on disk.
949 let (filter_tx, filter_rx) = smol::channel::bounded(64);
950 let (output_tx, output_rx) = smol::channel::bounded(64);
951 let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
952
953 let input = cx.background_spawn({
954 let fs = fs.clone();
955 let query = query.clone();
956 async move {
957 Self::find_candidate_paths(
958 fs,
959 snapshots,
960 open_entries,
961 query,
962 filter_tx,
963 output_tx,
964 )
965 .await
966 .log_err();
967 }
968 });
969 const MAX_CONCURRENT_FILE_SCANS: usize = 64;
970 let filters = cx.background_spawn(async move {
971 let fs = &fs;
972 let query = &query;
973 executor
974 .scoped(move |scope| {
975 for _ in 0..MAX_CONCURRENT_FILE_SCANS {
976 let filter_rx = filter_rx.clone();
977 scope.spawn(async move {
978 Self::filter_paths(fs, filter_rx, query)
979 .await
980 .log_with_level(log::Level::Debug);
981 })
982 }
983 })
984 .await;
985 });
986 cx.background_spawn(async move {
987 let mut matched = 0;
988 while let Ok(mut receiver) = output_rx.recv().await {
989 let Some(path) = receiver.next().await else {
990 continue;
991 };
992 let Ok(_) = matching_paths_tx.send(path).await else {
993 break;
994 };
995 matched += 1;
996 if matched == limit {
997 break;
998 }
999 }
1000 drop(input);
1001 drop(filters);
1002 })
1003 .detach();
1004 matching_paths_rx
1005 }
1006
1007 async fn find_candidate_paths(
1008 _: Arc<dyn Fs>,
1009 _: Vec<(worktree::Snapshot, WorktreeSettings)>,
1010 _: HashSet<ProjectEntryId>,
1011 _: SearchQuery,
1012 _: Sender<MatchingEntry>,
1013 _: Sender<oneshot::Receiver<ProjectPath>>,
1014 ) -> Result<()> {
1015 Ok(())
1016 }
1017
1018 async fn filter_paths(
1019 fs: &Arc<dyn Fs>,
1020 input: Receiver<MatchingEntry>,
1021 query: &SearchQuery,
1022 ) -> Result<()> {
1023 let mut input = pin!(input);
1024 while let Some(mut entry) = input.next().await {
1025 let abs_path = entry.worktree_root.join(entry.path.path.as_std_path());
1026 let Some(file) = fs.open_read(&abs_path).await.log_err() else {
1027 continue;
1028 };
1029
1030 let mut file = BufReader::new(file);
1031 let file_start = file.fill_buf().await?;
1032
1033 if let Err(Some(starting_position)) =
1034 std::str::from_utf8(&file_start).map_err(|e| e.error_len())
1035 {
1036 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1037 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1038 log::debug!(
1039 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1040 );
1041 continue;
1042 }
1043
1044 if query.detect(file).await.unwrap_or(false) {
1045 entry.respond.send(entry.path).await?
1046 }
1047 }
1048
1049 Ok(())
1050 }
1051
1052 pub async fn handle_create_project_entry(
1053 this: Entity<Self>,
1054 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1055 mut cx: AsyncApp,
1056 ) -> Result<proto::ProjectEntryResponse> {
1057 let worktree = this.update(&mut cx, |this, cx| {
1058 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1059 this.worktree_for_id(worktree_id, cx)
1060 .context("worktree not found")
1061 })??;
1062 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1063 }
1064
1065 pub async fn handle_copy_project_entry(
1066 this: Entity<Self>,
1067 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1068 mut cx: AsyncApp,
1069 ) -> Result<proto::ProjectEntryResponse> {
1070 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1071 let new_worktree_id = WorktreeId::from_proto(envelope.payload.new_worktree_id);
1072 let new_project_path = (
1073 new_worktree_id,
1074 RelPath::from_proto(&envelope.payload.new_path)?,
1075 );
1076 let (scan_id, entry) = this.update(&mut cx, |this, cx| {
1077 let Some((_, project_id)) = this.downstream_client else {
1078 bail!("no downstream client")
1079 };
1080 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1081 bail!("no such entry");
1082 };
1083 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1084 bail!("entry is private")
1085 }
1086
1087 let new_worktree = this
1088 .worktree_for_id(new_worktree_id, cx)
1089 .context("no such worktree")?;
1090 let scan_id = new_worktree.read(cx).scan_id();
1091 anyhow::Ok((
1092 scan_id,
1093 this.copy_entry(entry_id, new_project_path.into(), cx),
1094 ))
1095 })??;
1096 let entry = entry.await?;
1097 Ok(proto::ProjectEntryResponse {
1098 entry: entry.as_ref().map(|entry| entry.into()),
1099 worktree_scan_id: scan_id as u64,
1100 })
1101 }
1102
1103 pub async fn handle_delete_project_entry(
1104 this: Entity<Self>,
1105 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1106 mut cx: AsyncApp,
1107 ) -> Result<proto::ProjectEntryResponse> {
1108 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1109 let worktree = this.update(&mut cx, |this, cx| {
1110 let Some((_, project_id)) = this.downstream_client else {
1111 bail!("no downstream client")
1112 };
1113 let Some(entry) = this.entry_for_id(entry_id, cx) else {
1114 bail!("no entry")
1115 };
1116 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1117 bail!("entry is private")
1118 }
1119 this.worktree_for_entry(entry_id, cx)
1120 .context("worktree not found")
1121 })??;
1122 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1123 }
1124
1125 pub async fn handle_rename_project_entry(
1126 this: Entity<Self>,
1127 request: proto::RenameProjectEntry,
1128 mut cx: AsyncApp,
1129 ) -> Result<proto::ProjectEntryResponse> {
1130 let entry_id = ProjectEntryId::from_proto(request.entry_id);
1131 let new_worktree_id = WorktreeId::from_proto(request.new_worktree_id);
1132 let rel_path = RelPath::from_proto(&request.new_path)
1133 .with_context(|| format!("received invalid relative path {:?}", &request.new_path))?;
1134
1135 let (scan_id, task) = this.update(&mut cx, |this, cx| {
1136 let worktree = this
1137 .worktree_for_entry(entry_id, cx)
1138 .context("no such worktree")?;
1139
1140 let Some((_, project_id)) = this.downstream_client else {
1141 bail!("no downstream client")
1142 };
1143 let entry = worktree
1144 .read(cx)
1145 .entry_for_id(entry_id)
1146 .ok_or_else(|| anyhow!("missing entry"))?;
1147 if entry.is_private && project_id != REMOTE_SERVER_PROJECT_ID {
1148 bail!("entry is private")
1149 }
1150
1151 let scan_id = worktree.read(cx).scan_id();
1152 anyhow::Ok((
1153 scan_id,
1154 this.rename_entry(entry_id, (new_worktree_id, rel_path).into(), cx),
1155 ))
1156 })??;
1157 Ok(proto::ProjectEntryResponse {
1158 entry: match &task.await? {
1159 CreatedEntry::Included(entry) => Some(entry.into()),
1160 CreatedEntry::Excluded { .. } => None,
1161 },
1162 worktree_scan_id: scan_id as u64,
1163 })
1164 }
1165
1166 pub async fn handle_expand_project_entry(
1167 this: Entity<Self>,
1168 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1169 mut cx: AsyncApp,
1170 ) -> Result<proto::ExpandProjectEntryResponse> {
1171 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1172 let worktree = this
1173 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1174 .context("invalid request")?;
1175 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1176 }
1177
1178 pub async fn handle_expand_all_for_project_entry(
1179 this: Entity<Self>,
1180 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1181 mut cx: AsyncApp,
1182 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1183 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1184 let worktree = this
1185 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1186 .context("invalid request")?;
1187 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1188 }
1189
1190 pub fn fs(&self) -> Option<Arc<dyn Fs>> {
1191 match &self.state {
1192 WorktreeStoreState::Local { fs } => Some(fs.clone()),
1193 WorktreeStoreState::Remote { .. } => None,
1194 }
1195 }
1196}
1197
1198#[derive(Clone, Debug)]
1199enum WorktreeHandle {
1200 Strong(Entity<Worktree>),
1201 Weak(WeakEntity<Worktree>),
1202}
1203
1204impl WorktreeHandle {
1205 fn upgrade(&self) -> Option<Entity<Worktree>> {
1206 match self {
1207 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1208 WorktreeHandle::Weak(handle) => handle.upgrade(),
1209 }
1210 }
1211}