1use std::{
2 io::{BufRead, BufReader},
3 path::{Path, PathBuf},
4 pin::pin,
5 sync::{atomic::AtomicUsize, Arc},
6};
7
8use anyhow::{anyhow, Context as _, Result};
9use collections::{HashMap, HashSet};
10use fs::Fs;
11use futures::{
12 future::{BoxFuture, Shared},
13 FutureExt, SinkExt,
14};
15use gpui::{App, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity};
16use postage::oneshot;
17use rpc::{
18 proto::{self, SSH_PROJECT_ID},
19 AnyProtoClient, ErrorExt, TypedEnvelope,
20};
21use smol::{
22 channel::{Receiver, Sender},
23 stream::StreamExt,
24};
25use text::ReplicaId;
26use util::{paths::SanitizedPath, ResultExt};
27use worktree::{Entry, ProjectEntryId, UpdatedEntriesSet, Worktree, WorktreeId, WorktreeSettings};
28
29use crate::{search::SearchQuery, ProjectPath};
30
31struct MatchingEntry {
32 worktree_path: Arc<Path>,
33 path: ProjectPath,
34 respond: oneshot::Sender<ProjectPath>,
35}
36
37enum WorktreeStoreState {
38 Local {
39 fs: Arc<dyn Fs>,
40 },
41 Remote {
42 upstream_client: AnyProtoClient,
43 upstream_project_id: u64,
44 },
45}
46
47pub struct WorktreeStore {
48 next_entry_id: Arc<AtomicUsize>,
49 downstream_client: Option<(AnyProtoClient, u64)>,
50 retain_worktrees: bool,
51 worktrees: Vec<WorktreeHandle>,
52 worktrees_reordered: bool,
53 #[allow(clippy::type_complexity)]
54 loading_worktrees:
55 HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
56 state: WorktreeStoreState,
57}
58
59pub enum WorktreeStoreEvent {
60 WorktreeAdded(Entity<Worktree>),
61 WorktreeRemoved(EntityId, WorktreeId),
62 WorktreeReleased(EntityId, WorktreeId),
63 WorktreeOrderChanged,
64 WorktreeUpdateSent(Entity<Worktree>),
65 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
66 WorktreeUpdatedGitRepositories(WorktreeId),
67 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
68}
69
70impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
71
72impl WorktreeStore {
73 pub fn init(client: &AnyProtoClient) {
74 client.add_model_request_handler(Self::handle_create_project_entry);
75 client.add_model_request_handler(Self::handle_copy_project_entry);
76 client.add_model_request_handler(Self::handle_delete_project_entry);
77 client.add_model_request_handler(Self::handle_expand_project_entry);
78 client.add_model_request_handler(Self::handle_expand_all_for_project_entry);
79 client.add_model_request_handler(Self::handle_git_branches);
80 client.add_model_request_handler(Self::handle_update_branch);
81 }
82
83 pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
84 Self {
85 next_entry_id: Default::default(),
86 loading_worktrees: Default::default(),
87 downstream_client: None,
88 worktrees: Vec::new(),
89 worktrees_reordered: false,
90 retain_worktrees,
91 state: WorktreeStoreState::Local { fs },
92 }
93 }
94
95 pub fn remote(
96 retain_worktrees: bool,
97 upstream_client: AnyProtoClient,
98 upstream_project_id: u64,
99 ) -> Self {
100 Self {
101 next_entry_id: Default::default(),
102 loading_worktrees: Default::default(),
103 downstream_client: None,
104 worktrees: Vec::new(),
105 worktrees_reordered: false,
106 retain_worktrees,
107 state: WorktreeStoreState::Remote {
108 upstream_client,
109 upstream_project_id,
110 },
111 }
112 }
113
114 /// Iterates through all worktrees, including ones that don't appear in the project panel
115 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
116 self.worktrees
117 .iter()
118 .filter_map(move |worktree| worktree.upgrade())
119 }
120
121 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
122 pub fn visible_worktrees<'a>(
123 &'a self,
124 cx: &'a App,
125 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
126 self.worktrees()
127 .filter(|worktree| worktree.read(cx).is_visible())
128 }
129
130 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
131 self.worktrees()
132 .find(|worktree| worktree.read(cx).id() == id)
133 }
134
135 pub fn current_branch(&self, repository: ProjectPath, cx: &App) -> Option<Arc<str>> {
136 self.worktree_for_id(repository.worktree_id, cx)?
137 .read(cx)
138 .git_entry(repository.path)?
139 .branch()
140 }
141
142 pub fn worktree_for_entry(
143 &self,
144 entry_id: ProjectEntryId,
145 cx: &App,
146 ) -> Option<Entity<Worktree>> {
147 self.worktrees()
148 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
149 }
150
151 pub fn find_worktree(
152 &self,
153 abs_path: impl Into<SanitizedPath>,
154 cx: &App,
155 ) -> Option<(Entity<Worktree>, PathBuf)> {
156 let abs_path: SanitizedPath = abs_path.into();
157 for tree in self.worktrees() {
158 if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
159 return Some((tree.clone(), relative_path.into()));
160 }
161 }
162 None
163 }
164
165 pub fn find_or_create_worktree(
166 &mut self,
167 abs_path: impl AsRef<Path>,
168 visible: bool,
169 cx: &mut Context<Self>,
170 ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
171 let abs_path = abs_path.as_ref();
172 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
173 Task::ready(Ok((tree, relative_path)))
174 } else {
175 let worktree = self.create_worktree(abs_path, visible, cx);
176 cx.background_executor()
177 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
178 }
179 }
180
181 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
182 self.worktrees()
183 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
184 }
185
186 pub fn worktree_and_entry_for_id<'a>(
187 &'a self,
188 entry_id: ProjectEntryId,
189 cx: &'a App,
190 ) -> Option<(Entity<Worktree>, &'a Entry)> {
191 self.worktrees().find_map(|worktree| {
192 worktree
193 .read(cx)
194 .entry_for_id(entry_id)
195 .map(|e| (worktree.clone(), e))
196 })
197 }
198
199 pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
200 self.worktree_for_id(path.worktree_id, cx)?
201 .read(cx)
202 .entry_for_path(&path.path)
203 .cloned()
204 }
205
206 pub fn create_worktree(
207 &mut self,
208 abs_path: impl Into<SanitizedPath>,
209 visible: bool,
210 cx: &mut Context<Self>,
211 ) -> Task<Result<Entity<Worktree>>> {
212 let abs_path: SanitizedPath = abs_path.into();
213 if !self.loading_worktrees.contains_key(&abs_path) {
214 let task = match &self.state {
215 WorktreeStoreState::Remote {
216 upstream_client, ..
217 } => {
218 if upstream_client.is_via_collab() {
219 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
220 } else {
221 self.create_ssh_worktree(
222 upstream_client.clone(),
223 abs_path.clone(),
224 visible,
225 cx,
226 )
227 }
228 }
229 WorktreeStoreState::Local { fs } => {
230 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
231 }
232 };
233
234 self.loading_worktrees
235 .insert(abs_path.clone(), task.shared());
236 }
237 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
238 cx.spawn(|this, mut cx| async move {
239 let result = task.await;
240 this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
241 .ok();
242 match result {
243 Ok(worktree) => Ok(worktree),
244 Err(err) => Err((*err).cloned()),
245 }
246 })
247 }
248
249 fn create_ssh_worktree(
250 &mut self,
251 client: AnyProtoClient,
252 abs_path: impl Into<SanitizedPath>,
253 visible: bool,
254 cx: &mut Context<Self>,
255 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
256 let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
257 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
258 // in which case want to strip the leading the `/`.
259 // On the host-side, the `~` will get expanded.
260 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
261 if abs_path.starts_with("/~") {
262 abs_path = abs_path[1..].to_string();
263 }
264 if abs_path.is_empty() || abs_path == "/" {
265 abs_path = "~/".to_string();
266 }
267 cx.spawn(|this, mut cx| async move {
268 let this = this.upgrade().context("Dropped worktree store")?;
269
270 let response = client
271 .request(proto::AddWorktree {
272 project_id: SSH_PROJECT_ID,
273 path: abs_path.clone(),
274 visible,
275 })
276 .await?;
277
278 if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
279 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
280 })? {
281 return Ok(existing_worktree);
282 }
283
284 let root_name = PathBuf::from(&response.canonicalized_path)
285 .file_name()
286 .map(|n| n.to_string_lossy().to_string())
287 .unwrap_or(response.canonicalized_path.to_string());
288
289 let worktree = cx.update(|cx| {
290 Worktree::remote(
291 SSH_PROJECT_ID,
292 0,
293 proto::WorktreeMetadata {
294 id: response.worktree_id,
295 root_name,
296 visible,
297 abs_path: response.canonicalized_path,
298 },
299 client,
300 cx,
301 )
302 })?;
303
304 this.update(&mut cx, |this, cx| {
305 this.add(&worktree, cx);
306 })?;
307 Ok(worktree)
308 })
309 }
310
311 fn create_local_worktree(
312 &mut self,
313 fs: Arc<dyn Fs>,
314 abs_path: impl Into<SanitizedPath>,
315 visible: bool,
316 cx: &mut Context<Self>,
317 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
318 let next_entry_id = self.next_entry_id.clone();
319 let path: SanitizedPath = abs_path.into();
320
321 cx.spawn(move |this, mut cx| async move {
322 let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
323
324 let worktree = worktree?;
325
326 this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
327
328 if visible {
329 cx.update(|cx| {
330 cx.add_recent_document(path.as_path());
331 })
332 .log_err();
333 }
334
335 Ok(worktree)
336 })
337 }
338
339 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
340 let worktree_id = worktree.read(cx).id();
341 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
342
343 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
344 let handle = if push_strong_handle {
345 WorktreeHandle::Strong(worktree.clone())
346 } else {
347 WorktreeHandle::Weak(worktree.downgrade())
348 };
349 if self.worktrees_reordered {
350 self.worktrees.push(handle);
351 } else {
352 let i = match self
353 .worktrees
354 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
355 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
356 }) {
357 Ok(i) | Err(i) => i,
358 };
359 self.worktrees.insert(i, handle);
360 }
361
362 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
363 self.send_project_updates(cx);
364
365 let handle_id = worktree.entity_id();
366 cx.subscribe(worktree, |_, worktree, event, cx| {
367 let worktree_id = worktree.update(cx, |worktree, _| worktree.id());
368 match event {
369 worktree::Event::UpdatedEntries(changes) => {
370 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
371 worktree.read(cx).id(),
372 changes.clone(),
373 ));
374 }
375 worktree::Event::UpdatedGitRepositories(_) => {
376 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
377 worktree_id,
378 ));
379 }
380 worktree::Event::DeletedEntry(id) => {
381 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
382 }
383 }
384 })
385 .detach();
386 cx.observe_release(worktree, move |this, worktree, cx| {
387 cx.emit(WorktreeStoreEvent::WorktreeReleased(
388 handle_id,
389 worktree.id(),
390 ));
391 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
392 handle_id,
393 worktree.id(),
394 ));
395 this.send_project_updates(cx);
396 })
397 .detach();
398 }
399
400 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
401 self.worktrees.retain(|worktree| {
402 if let Some(worktree) = worktree.upgrade() {
403 if worktree.read(cx).id() == id_to_remove {
404 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
405 worktree.entity_id(),
406 id_to_remove,
407 ));
408 false
409 } else {
410 true
411 }
412 } else {
413 false
414 }
415 });
416 self.send_project_updates(cx);
417 }
418
419 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
420 self.worktrees_reordered = worktrees_reordered;
421 }
422
423 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
424 match &self.state {
425 WorktreeStoreState::Remote {
426 upstream_client,
427 upstream_project_id,
428 ..
429 } => Some((upstream_client.clone(), *upstream_project_id)),
430 WorktreeStoreState::Local { .. } => None,
431 }
432 }
433
434 pub fn set_worktrees_from_proto(
435 &mut self,
436 worktrees: Vec<proto::WorktreeMetadata>,
437 replica_id: ReplicaId,
438 cx: &mut Context<Self>,
439 ) -> Result<()> {
440 let mut old_worktrees_by_id = self
441 .worktrees
442 .drain(..)
443 .filter_map(|worktree| {
444 let worktree = worktree.upgrade()?;
445 Some((worktree.read(cx).id(), worktree))
446 })
447 .collect::<HashMap<_, _>>();
448
449 let (client, project_id) = self
450 .upstream_client()
451 .clone()
452 .ok_or_else(|| anyhow!("invalid project"))?;
453
454 for worktree in worktrees {
455 if let Some(old_worktree) =
456 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
457 {
458 let push_strong_handle =
459 self.retain_worktrees || old_worktree.read(cx).is_visible();
460 let handle = if push_strong_handle {
461 WorktreeHandle::Strong(old_worktree.clone())
462 } else {
463 WorktreeHandle::Weak(old_worktree.downgrade())
464 };
465 self.worktrees.push(handle);
466 } else {
467 self.add(
468 &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
469 cx,
470 );
471 }
472 }
473 self.send_project_updates(cx);
474
475 Ok(())
476 }
477
478 pub fn move_worktree(
479 &mut self,
480 source: WorktreeId,
481 destination: WorktreeId,
482 cx: &mut Context<Self>,
483 ) -> Result<()> {
484 if source == destination {
485 return Ok(());
486 }
487
488 let mut source_index = None;
489 let mut destination_index = None;
490 for (i, worktree) in self.worktrees.iter().enumerate() {
491 if let Some(worktree) = worktree.upgrade() {
492 let worktree_id = worktree.read(cx).id();
493 if worktree_id == source {
494 source_index = Some(i);
495 if destination_index.is_some() {
496 break;
497 }
498 } else if worktree_id == destination {
499 destination_index = Some(i);
500 if source_index.is_some() {
501 break;
502 }
503 }
504 }
505 }
506
507 let source_index =
508 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
509 let destination_index =
510 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
511
512 if source_index == destination_index {
513 return Ok(());
514 }
515
516 let worktree_to_move = self.worktrees.remove(source_index);
517 self.worktrees.insert(destination_index, worktree_to_move);
518 self.worktrees_reordered = true;
519 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
520 cx.notify();
521 Ok(())
522 }
523
524 pub fn disconnected_from_host(&mut self, cx: &mut App) {
525 for worktree in &self.worktrees {
526 if let Some(worktree) = worktree.upgrade() {
527 worktree.update(cx, |worktree, _| {
528 if let Some(worktree) = worktree.as_remote_mut() {
529 worktree.disconnected_from_host();
530 }
531 });
532 }
533 }
534 }
535
536 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
537 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
538 return;
539 };
540
541 let update = proto::UpdateProject {
542 project_id,
543 worktrees: self.worktree_metadata_protos(cx),
544 };
545
546 // collab has bad concurrency guarantees, so we send requests in serial.
547 let update_project = if downstream_client.is_via_collab() {
548 Some(downstream_client.request(update))
549 } else {
550 downstream_client.send(update).log_err();
551 None
552 };
553 cx.spawn(|this, mut cx| async move {
554 if let Some(update_project) = update_project {
555 update_project.await?;
556 }
557
558 this.update(&mut cx, |this, cx| {
559 let worktrees = this.worktrees().collect::<Vec<_>>();
560
561 for worktree in worktrees {
562 worktree.update(cx, |worktree, cx| {
563 let client = downstream_client.clone();
564 worktree.observe_updates(project_id, cx, {
565 move |update| {
566 let client = client.clone();
567 async move {
568 if client.is_via_collab() {
569 client
570 .request(update)
571 .map(|result| result.log_err().is_some())
572 .await
573 } else {
574 client.send(update).log_err().is_some()
575 }
576 }
577 }
578 });
579 });
580
581 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
582 }
583
584 anyhow::Ok(())
585 })
586 })
587 .detach_and_log_err(cx);
588 }
589
590 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
591 self.worktrees()
592 .map(|worktree| {
593 let worktree = worktree.read(cx);
594 proto::WorktreeMetadata {
595 id: worktree.id().to_proto(),
596 root_name: worktree.root_name().into(),
597 visible: worktree.is_visible(),
598 abs_path: worktree.abs_path().to_string_lossy().into(),
599 }
600 })
601 .collect()
602 }
603
604 pub fn shared(
605 &mut self,
606 remote_id: u64,
607 downstream_client: AnyProtoClient,
608 cx: &mut Context<Self>,
609 ) {
610 self.retain_worktrees = true;
611 self.downstream_client = Some((downstream_client, remote_id));
612
613 // When shared, retain all worktrees
614 for worktree_handle in self.worktrees.iter_mut() {
615 match worktree_handle {
616 WorktreeHandle::Strong(_) => {}
617 WorktreeHandle::Weak(worktree) => {
618 if let Some(worktree) = worktree.upgrade() {
619 *worktree_handle = WorktreeHandle::Strong(worktree);
620 }
621 }
622 }
623 }
624 self.send_project_updates(cx);
625 }
626
627 pub fn unshared(&mut self, cx: &mut Context<Self>) {
628 self.retain_worktrees = false;
629 self.downstream_client.take();
630
631 // When not shared, only retain the visible worktrees
632 for worktree_handle in self.worktrees.iter_mut() {
633 if let WorktreeHandle::Strong(worktree) = worktree_handle {
634 let is_visible = worktree.update(cx, |worktree, _| {
635 worktree.stop_observing_updates();
636 worktree.is_visible()
637 });
638 if !is_visible {
639 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
640 }
641 }
642 }
643 }
644
645 /// search over all worktrees and return buffers that *might* match the search.
646 pub fn find_search_candidates(
647 &self,
648 query: SearchQuery,
649 limit: usize,
650 open_entries: HashSet<ProjectEntryId>,
651 fs: Arc<dyn Fs>,
652 cx: &Context<Self>,
653 ) -> Receiver<ProjectPath> {
654 let snapshots = self
655 .visible_worktrees(cx)
656 .filter_map(|tree| {
657 let tree = tree.read(cx);
658 Some((tree.snapshot(), tree.as_local()?.settings()))
659 })
660 .collect::<Vec<_>>();
661
662 let executor = cx.background_executor().clone();
663
664 // We want to return entries in the order they are in the worktrees, so we have one
665 // thread that iterates over the worktrees (and ignored directories) as necessary,
666 // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
667 // channel.
668 // We spawn a number of workers that take items from the filter channel and check the query
669 // against the version of the file on disk.
670 let (filter_tx, filter_rx) = smol::channel::bounded(64);
671 let (output_tx, output_rx) = smol::channel::bounded(64);
672 let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
673
674 let input = cx.background_executor().spawn({
675 let fs = fs.clone();
676 let query = query.clone();
677 async move {
678 Self::find_candidate_paths(
679 fs,
680 snapshots,
681 open_entries,
682 query,
683 filter_tx,
684 output_tx,
685 )
686 .await
687 .log_err();
688 }
689 });
690 const MAX_CONCURRENT_FILE_SCANS: usize = 64;
691 let filters = cx.background_executor().spawn(async move {
692 let fs = &fs;
693 let query = &query;
694 executor
695 .scoped(move |scope| {
696 for _ in 0..MAX_CONCURRENT_FILE_SCANS {
697 let filter_rx = filter_rx.clone();
698 scope.spawn(async move {
699 Self::filter_paths(fs, filter_rx, query)
700 .await
701 .log_with_level(log::Level::Debug);
702 })
703 }
704 })
705 .await;
706 });
707 cx.background_executor()
708 .spawn(async move {
709 let mut matched = 0;
710 while let Ok(mut receiver) = output_rx.recv().await {
711 let Some(path) = receiver.next().await else {
712 continue;
713 };
714 let Ok(_) = matching_paths_tx.send(path).await else {
715 break;
716 };
717 matched += 1;
718 if matched == limit {
719 break;
720 }
721 }
722 drop(input);
723 drop(filters);
724 })
725 .detach();
726 matching_paths_rx
727 }
728
729 fn scan_ignored_dir<'a>(
730 fs: &'a Arc<dyn Fs>,
731 snapshot: &'a worktree::Snapshot,
732 path: &'a Path,
733 query: &'a SearchQuery,
734 include_root: bool,
735 filter_tx: &'a Sender<MatchingEntry>,
736 output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
737 ) -> BoxFuture<'a, Result<()>> {
738 async move {
739 let abs_path = snapshot.abs_path().join(path);
740 let Some(mut files) = fs
741 .read_dir(&abs_path)
742 .await
743 .with_context(|| format!("listing ignored path {abs_path:?}"))
744 .log_err()
745 else {
746 return Ok(());
747 };
748
749 let mut results = Vec::new();
750
751 while let Some(Ok(file)) = files.next().await {
752 let Some(metadata) = fs
753 .metadata(&file)
754 .await
755 .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
756 .log_err()
757 .flatten()
758 else {
759 continue;
760 };
761 if metadata.is_symlink || metadata.is_fifo {
762 continue;
763 }
764 results.push((
765 file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
766 !metadata.is_dir,
767 ))
768 }
769 results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
770 for (path, is_file) in results {
771 if is_file {
772 if query.filters_path() {
773 let matched_path = if include_root {
774 let mut full_path = PathBuf::from(snapshot.root_name());
775 full_path.push(&path);
776 query.file_matches(&full_path)
777 } else {
778 query.file_matches(&path)
779 };
780 if !matched_path {
781 continue;
782 }
783 }
784 let (tx, rx) = oneshot::channel();
785 output_tx.send(rx).await?;
786 filter_tx
787 .send(MatchingEntry {
788 respond: tx,
789 worktree_path: snapshot.abs_path().clone(),
790 path: ProjectPath {
791 worktree_id: snapshot.id(),
792 path: Arc::from(path),
793 },
794 })
795 .await?;
796 } else {
797 Self::scan_ignored_dir(
798 fs,
799 snapshot,
800 &path,
801 query,
802 include_root,
803 filter_tx,
804 output_tx,
805 )
806 .await?;
807 }
808 }
809 Ok(())
810 }
811 .boxed()
812 }
813
814 async fn find_candidate_paths(
815 fs: Arc<dyn Fs>,
816 snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
817 open_entries: HashSet<ProjectEntryId>,
818 query: SearchQuery,
819 filter_tx: Sender<MatchingEntry>,
820 output_tx: Sender<oneshot::Receiver<ProjectPath>>,
821 ) -> Result<()> {
822 let include_root = snapshots.len() > 1;
823 for (snapshot, settings) in snapshots {
824 for entry in snapshot.entries(query.include_ignored(), 0) {
825 if entry.is_dir() && entry.is_ignored {
826 if !settings.is_path_excluded(&entry.path) {
827 Self::scan_ignored_dir(
828 &fs,
829 &snapshot,
830 &entry.path,
831 &query,
832 include_root,
833 &filter_tx,
834 &output_tx,
835 )
836 .await?;
837 }
838 continue;
839 }
840
841 if entry.is_fifo || !entry.is_file() {
842 continue;
843 }
844
845 if query.filters_path() {
846 let matched_path = if include_root {
847 let mut full_path = PathBuf::from(snapshot.root_name());
848 full_path.push(&entry.path);
849 query.file_matches(&full_path)
850 } else {
851 query.file_matches(&entry.path)
852 };
853 if !matched_path {
854 continue;
855 }
856 }
857
858 let (mut tx, rx) = oneshot::channel();
859
860 if open_entries.contains(&entry.id) {
861 tx.send(ProjectPath {
862 worktree_id: snapshot.id(),
863 path: entry.path.clone(),
864 })
865 .await?;
866 } else {
867 filter_tx
868 .send(MatchingEntry {
869 respond: tx,
870 worktree_path: snapshot.abs_path().clone(),
871 path: ProjectPath {
872 worktree_id: snapshot.id(),
873 path: entry.path.clone(),
874 },
875 })
876 .await?;
877 }
878
879 output_tx.send(rx).await?;
880 }
881 }
882 Ok(())
883 }
884
885 pub fn branches(
886 &self,
887 project_path: ProjectPath,
888 cx: &App,
889 ) -> Task<Result<Vec<git::repository::Branch>>> {
890 let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
891 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
892 };
893
894 match worktree.read(cx) {
895 Worktree::Local(local_worktree) => {
896 let branches = util::maybe!({
897 let worktree_error = |error| {
898 format!(
899 "{} for worktree {}",
900 error,
901 local_worktree.abs_path().to_string_lossy()
902 )
903 };
904
905 let entry = local_worktree
906 .git_entry(project_path.path)
907 .with_context(|| worktree_error("No git entry found"))?;
908
909 let repo = local_worktree
910 .get_local_repo(&entry)
911 .with_context(|| worktree_error("No repository found"))?
912 .repo()
913 .clone();
914
915 repo.branches()
916 });
917
918 Task::ready(branches)
919 }
920 Worktree::Remote(remote_worktree) => {
921 let request = remote_worktree.client().request(proto::GitBranches {
922 project_id: remote_worktree.project_id(),
923 repository: Some(proto::ProjectPath {
924 worktree_id: project_path.worktree_id.to_proto(),
925 path: project_path.path.to_string_lossy().to_string(), // Root path
926 }),
927 });
928
929 cx.background_executor().spawn(async move {
930 let response = request.await?;
931
932 let branches = response
933 .branches
934 .into_iter()
935 .map(|proto_branch| git::repository::Branch {
936 is_head: proto_branch.is_head,
937 name: proto_branch.name.into(),
938 unix_timestamp: proto_branch
939 .unix_timestamp
940 .map(|timestamp| timestamp as i64),
941 })
942 .collect();
943
944 Ok(branches)
945 })
946 }
947 }
948 }
949
950 pub fn update_or_create_branch(
951 &self,
952 repository: ProjectPath,
953 new_branch: String,
954 cx: &App,
955 ) -> Task<Result<()>> {
956 let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
957 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
958 };
959
960 match worktree.read(cx) {
961 Worktree::Local(local_worktree) => {
962 let result = util::maybe!({
963 let worktree_error = |error| {
964 format!(
965 "{} for worktree {}",
966 error,
967 local_worktree.abs_path().to_string_lossy()
968 )
969 };
970
971 let entry = local_worktree
972 .git_entry(repository.path)
973 .with_context(|| worktree_error("No git entry found"))?;
974
975 let repo = local_worktree
976 .get_local_repo(&entry)
977 .with_context(|| worktree_error("No repository found"))?
978 .repo()
979 .clone();
980
981 if !repo.branch_exits(&new_branch)? {
982 repo.create_branch(&new_branch)?;
983 }
984
985 repo.change_branch(&new_branch)?;
986 Ok(())
987 });
988
989 Task::ready(result)
990 }
991 Worktree::Remote(remote_worktree) => {
992 let request = remote_worktree.client().request(proto::UpdateGitBranch {
993 project_id: remote_worktree.project_id(),
994 repository: Some(proto::ProjectPath {
995 worktree_id: repository.worktree_id.to_proto(),
996 path: repository.path.to_string_lossy().to_string(), // Root path
997 }),
998 branch_name: new_branch,
999 });
1000
1001 cx.background_executor().spawn(async move {
1002 request.await?;
1003 Ok(())
1004 })
1005 }
1006 }
1007 }
1008
1009 async fn filter_paths(
1010 fs: &Arc<dyn Fs>,
1011 mut input: Receiver<MatchingEntry>,
1012 query: &SearchQuery,
1013 ) -> Result<()> {
1014 let mut input = pin!(input);
1015 while let Some(mut entry) = input.next().await {
1016 let abs_path = entry.worktree_path.join(&entry.path.path);
1017 let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1018 continue;
1019 };
1020
1021 let mut file = BufReader::new(file);
1022 let file_start = file.fill_buf()?;
1023
1024 if let Err(Some(starting_position)) =
1025 std::str::from_utf8(file_start).map_err(|e| e.error_len())
1026 {
1027 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1028 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1029 log::debug!(
1030 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1031 );
1032 continue;
1033 }
1034
1035 if query.detect(file).unwrap_or(false) {
1036 entry.respond.send(entry.path).await?
1037 }
1038 }
1039
1040 Ok(())
1041 }
1042
1043 pub async fn handle_create_project_entry(
1044 this: Entity<Self>,
1045 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1046 mut cx: AsyncApp,
1047 ) -> Result<proto::ProjectEntryResponse> {
1048 let worktree = this.update(&mut cx, |this, cx| {
1049 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1050 this.worktree_for_id(worktree_id, cx)
1051 .ok_or_else(|| anyhow!("worktree not found"))
1052 })??;
1053 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1054 }
1055
1056 pub async fn handle_copy_project_entry(
1057 this: Entity<Self>,
1058 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1059 mut cx: AsyncApp,
1060 ) -> Result<proto::ProjectEntryResponse> {
1061 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1062 let worktree = this.update(&mut cx, |this, cx| {
1063 this.worktree_for_entry(entry_id, cx)
1064 .ok_or_else(|| anyhow!("worktree not found"))
1065 })??;
1066 Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1067 }
1068
1069 pub async fn handle_delete_project_entry(
1070 this: Entity<Self>,
1071 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1072 mut cx: AsyncApp,
1073 ) -> Result<proto::ProjectEntryResponse> {
1074 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1075 let worktree = this.update(&mut cx, |this, cx| {
1076 this.worktree_for_entry(entry_id, cx)
1077 .ok_or_else(|| anyhow!("worktree not found"))
1078 })??;
1079 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1080 }
1081
1082 pub async fn handle_expand_project_entry(
1083 this: Entity<Self>,
1084 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1085 mut cx: AsyncApp,
1086 ) -> Result<proto::ExpandProjectEntryResponse> {
1087 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1088 let worktree = this
1089 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1090 .ok_or_else(|| anyhow!("invalid request"))?;
1091 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1092 }
1093
1094 pub async fn handle_expand_all_for_project_entry(
1095 this: Entity<Self>,
1096 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1097 mut cx: AsyncApp,
1098 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1099 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1100 let worktree = this
1101 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1102 .ok_or_else(|| anyhow!("invalid request"))?;
1103 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1104 }
1105
1106 pub async fn handle_git_branches(
1107 this: Entity<Self>,
1108 branches: TypedEnvelope<proto::GitBranches>,
1109 cx: AsyncApp,
1110 ) -> Result<proto::GitBranchesResponse> {
1111 let project_path = branches
1112 .payload
1113 .repository
1114 .clone()
1115 .context("Invalid GitBranches call")?;
1116 let project_path = ProjectPath {
1117 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1118 path: Path::new(&project_path.path).into(),
1119 };
1120
1121 let branches = this
1122 .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1123 .await?;
1124
1125 Ok(proto::GitBranchesResponse {
1126 branches: branches
1127 .into_iter()
1128 .map(|branch| proto::Branch {
1129 is_head: branch.is_head,
1130 name: branch.name.to_string(),
1131 unix_timestamp: branch.unix_timestamp.map(|timestamp| timestamp as u64),
1132 })
1133 .collect(),
1134 })
1135 }
1136
1137 pub async fn handle_update_branch(
1138 this: Entity<Self>,
1139 update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1140 cx: AsyncApp,
1141 ) -> Result<proto::Ack> {
1142 let project_path = update_branch
1143 .payload
1144 .repository
1145 .clone()
1146 .context("Invalid GitBranches call")?;
1147 let project_path = ProjectPath {
1148 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1149 path: Path::new(&project_path.path).into(),
1150 };
1151 let new_branch = update_branch.payload.branch_name;
1152
1153 this.read_with(&cx, |this, cx| {
1154 this.update_or_create_branch(project_path, new_branch, cx)
1155 })?
1156 .await?;
1157
1158 Ok(proto::Ack {})
1159 }
1160}
1161
1162#[derive(Clone, Debug)]
1163enum WorktreeHandle {
1164 Strong(Entity<Worktree>),
1165 Weak(WeakEntity<Worktree>),
1166}
1167
1168impl WorktreeHandle {
1169 fn upgrade(&self) -> Option<Entity<Worktree>> {
1170 match self {
1171 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1172 WorktreeHandle::Weak(handle) => handle.upgrade(),
1173 }
1174 }
1175}