1use std::{
2 io::{BufRead, BufReader},
3 path::{Path, PathBuf},
4 pin::pin,
5 sync::{atomic::AtomicUsize, Arc},
6};
7
8use anyhow::{anyhow, Context as _, Result};
9use collections::{HashMap, HashSet};
10use fs::Fs;
11use futures::{
12 future::{BoxFuture, Shared},
13 FutureExt, SinkExt,
14};
15use git::repository::Branch;
16use gpui::{App, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity};
17use postage::oneshot;
18use rpc::{
19 proto::{self, FromProto, ToProto, SSH_PROJECT_ID},
20 AnyProtoClient, ErrorExt, TypedEnvelope,
21};
22use smol::{
23 channel::{Receiver, Sender},
24 stream::StreamExt,
25};
26use text::ReplicaId;
27use util::{paths::SanitizedPath, ResultExt};
28use worktree::{
29 branch_to_proto, Entry, ProjectEntryId, UpdatedEntriesSet, Worktree, WorktreeId,
30 WorktreeSettings,
31};
32
33use crate::{search::SearchQuery, ProjectPath};
34
35struct MatchingEntry {
36 worktree_path: Arc<Path>,
37 path: ProjectPath,
38 respond: oneshot::Sender<ProjectPath>,
39}
40
41enum WorktreeStoreState {
42 Local {
43 fs: Arc<dyn Fs>,
44 },
45 Remote {
46 upstream_client: AnyProtoClient,
47 upstream_project_id: u64,
48 },
49}
50
51pub struct WorktreeStore {
52 next_entry_id: Arc<AtomicUsize>,
53 downstream_client: Option<(AnyProtoClient, u64)>,
54 retain_worktrees: bool,
55 worktrees: Vec<WorktreeHandle>,
56 worktrees_reordered: bool,
57 #[allow(clippy::type_complexity)]
58 loading_worktrees:
59 HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
60 state: WorktreeStoreState,
61}
62
63#[derive(Debug)]
64pub enum WorktreeStoreEvent {
65 WorktreeAdded(Entity<Worktree>),
66 WorktreeRemoved(EntityId, WorktreeId),
67 WorktreeReleased(EntityId, WorktreeId),
68 WorktreeOrderChanged,
69 WorktreeUpdateSent(Entity<Worktree>),
70 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
71 WorktreeUpdatedGitRepositories(WorktreeId),
72 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
73}
74
75impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
76
77impl WorktreeStore {
78 pub fn init(client: &AnyProtoClient) {
79 client.add_entity_request_handler(Self::handle_create_project_entry);
80 client.add_entity_request_handler(Self::handle_copy_project_entry);
81 client.add_entity_request_handler(Self::handle_delete_project_entry);
82 client.add_entity_request_handler(Self::handle_expand_project_entry);
83 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
84 client.add_entity_request_handler(Self::handle_git_branches);
85 client.add_entity_request_handler(Self::handle_update_branch);
86 }
87
88 pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
89 Self {
90 next_entry_id: Default::default(),
91 loading_worktrees: Default::default(),
92 downstream_client: None,
93 worktrees: Vec::new(),
94 worktrees_reordered: false,
95 retain_worktrees,
96 state: WorktreeStoreState::Local { fs },
97 }
98 }
99
100 pub fn remote(
101 retain_worktrees: bool,
102 upstream_client: AnyProtoClient,
103 upstream_project_id: u64,
104 ) -> Self {
105 Self {
106 next_entry_id: Default::default(),
107 loading_worktrees: Default::default(),
108 downstream_client: None,
109 worktrees: Vec::new(),
110 worktrees_reordered: false,
111 retain_worktrees,
112 state: WorktreeStoreState::Remote {
113 upstream_client,
114 upstream_project_id,
115 },
116 }
117 }
118
119 /// Iterates through all worktrees, including ones that don't appear in the project panel
120 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
121 self.worktrees
122 .iter()
123 .filter_map(move |worktree| worktree.upgrade())
124 }
125
126 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
127 pub fn visible_worktrees<'a>(
128 &'a self,
129 cx: &'a App,
130 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
131 self.worktrees()
132 .filter(|worktree| worktree.read(cx).is_visible())
133 }
134
135 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
136 self.worktrees()
137 .find(|worktree| worktree.read(cx).id() == id)
138 }
139
140 pub fn current_branch(&self, repository: ProjectPath, cx: &App) -> Option<Branch> {
141 self.worktree_for_id(repository.worktree_id, cx)?
142 .read(cx)
143 .git_entry(repository.path)?
144 .branch()
145 .cloned()
146 }
147
148 pub fn worktree_for_entry(
149 &self,
150 entry_id: ProjectEntryId,
151 cx: &App,
152 ) -> Option<Entity<Worktree>> {
153 self.worktrees()
154 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
155 }
156
157 pub fn find_worktree(
158 &self,
159 abs_path: impl Into<SanitizedPath>,
160 cx: &App,
161 ) -> Option<(Entity<Worktree>, PathBuf)> {
162 let abs_path: SanitizedPath = abs_path.into();
163 for tree in self.worktrees() {
164 if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
165 return Some((tree.clone(), relative_path.into()));
166 }
167 }
168 None
169 }
170
171 pub fn find_or_create_worktree(
172 &mut self,
173 abs_path: impl AsRef<Path>,
174 visible: bool,
175 cx: &mut Context<Self>,
176 ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
177 let abs_path = abs_path.as_ref();
178 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
179 Task::ready(Ok((tree, relative_path)))
180 } else {
181 let worktree = self.create_worktree(abs_path, visible, cx);
182 cx.background_executor()
183 .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
184 }
185 }
186
187 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
188 self.worktrees()
189 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
190 }
191
192 pub fn worktree_and_entry_for_id<'a>(
193 &'a self,
194 entry_id: ProjectEntryId,
195 cx: &'a App,
196 ) -> Option<(Entity<Worktree>, &'a Entry)> {
197 self.worktrees().find_map(|worktree| {
198 worktree
199 .read(cx)
200 .entry_for_id(entry_id)
201 .map(|e| (worktree.clone(), e))
202 })
203 }
204
205 pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
206 self.worktree_for_id(path.worktree_id, cx)?
207 .read(cx)
208 .entry_for_path(&path.path)
209 .cloned()
210 }
211
212 pub fn create_worktree(
213 &mut self,
214 abs_path: impl Into<SanitizedPath>,
215 visible: bool,
216 cx: &mut Context<Self>,
217 ) -> Task<Result<Entity<Worktree>>> {
218 let abs_path: SanitizedPath = abs_path.into();
219 if !self.loading_worktrees.contains_key(&abs_path) {
220 let task = match &self.state {
221 WorktreeStoreState::Remote {
222 upstream_client, ..
223 } => {
224 if upstream_client.is_via_collab() {
225 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
226 } else {
227 self.create_ssh_worktree(
228 upstream_client.clone(),
229 abs_path.clone(),
230 visible,
231 cx,
232 )
233 }
234 }
235 WorktreeStoreState::Local { fs } => {
236 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
237 }
238 };
239
240 self.loading_worktrees
241 .insert(abs_path.clone(), task.shared());
242 }
243 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
244 cx.spawn(|this, mut cx| async move {
245 let result = task.await;
246 this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
247 .ok();
248 match result {
249 Ok(worktree) => Ok(worktree),
250 Err(err) => Err((*err).cloned()),
251 }
252 })
253 }
254
255 fn create_ssh_worktree(
256 &mut self,
257 client: AnyProtoClient,
258 abs_path: impl Into<SanitizedPath>,
259 visible: bool,
260 cx: &mut Context<Self>,
261 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
262 let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
263 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
264 // in which case want to strip the leading the `/`.
265 // On the host-side, the `~` will get expanded.
266 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
267 if abs_path.starts_with("/~") {
268 abs_path = abs_path[1..].to_string();
269 }
270 if abs_path.is_empty() || abs_path == "/" {
271 abs_path = "~/".to_string();
272 }
273 cx.spawn(|this, mut cx| async move {
274 let this = this.upgrade().context("Dropped worktree store")?;
275
276 let path = Path::new(abs_path.as_str());
277 let response = client
278 .request(proto::AddWorktree {
279 project_id: SSH_PROJECT_ID,
280 path: path.to_proto(),
281 visible,
282 })
283 .await?;
284
285 if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
286 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
287 })? {
288 return Ok(existing_worktree);
289 }
290
291 let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
292 let root_name = root_path_buf
293 .file_name()
294 .map(|n| n.to_string_lossy().to_string())
295 .unwrap_or(root_path_buf.to_string_lossy().to_string());
296
297 let worktree = cx.update(|cx| {
298 Worktree::remote(
299 SSH_PROJECT_ID,
300 0,
301 proto::WorktreeMetadata {
302 id: response.worktree_id,
303 root_name,
304 visible,
305 abs_path: response.canonicalized_path,
306 },
307 client,
308 cx,
309 )
310 })?;
311
312 this.update(&mut cx, |this, cx| {
313 this.add(&worktree, cx);
314 })?;
315 Ok(worktree)
316 })
317 }
318
319 fn create_local_worktree(
320 &mut self,
321 fs: Arc<dyn Fs>,
322 abs_path: impl Into<SanitizedPath>,
323 visible: bool,
324 cx: &mut Context<Self>,
325 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
326 let next_entry_id = self.next_entry_id.clone();
327 let path: SanitizedPath = abs_path.into();
328
329 cx.spawn(move |this, mut cx| async move {
330 let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
331
332 let worktree = worktree?;
333
334 this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
335
336 if visible {
337 cx.update(|cx| {
338 cx.add_recent_document(path.as_path());
339 })
340 .log_err();
341 }
342
343 Ok(worktree)
344 })
345 }
346
347 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
348 let worktree_id = worktree.read(cx).id();
349 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
350
351 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
352 let handle = if push_strong_handle {
353 WorktreeHandle::Strong(worktree.clone())
354 } else {
355 WorktreeHandle::Weak(worktree.downgrade())
356 };
357 if self.worktrees_reordered {
358 self.worktrees.push(handle);
359 } else {
360 let i = match self
361 .worktrees
362 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
363 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
364 }) {
365 Ok(i) | Err(i) => i,
366 };
367 self.worktrees.insert(i, handle);
368 }
369
370 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
371 self.send_project_updates(cx);
372
373 let handle_id = worktree.entity_id();
374 cx.subscribe(worktree, |_, worktree, event, cx| {
375 let worktree_id = worktree.update(cx, |worktree, _| worktree.id());
376 match event {
377 worktree::Event::UpdatedEntries(changes) => {
378 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
379 worktree.read(cx).id(),
380 changes.clone(),
381 ));
382 }
383 worktree::Event::UpdatedGitRepositories(_) => {
384 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
385 worktree_id,
386 ));
387 }
388 worktree::Event::DeletedEntry(id) => {
389 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
390 }
391 }
392 })
393 .detach();
394 cx.observe_release(worktree, move |this, worktree, cx| {
395 cx.emit(WorktreeStoreEvent::WorktreeReleased(
396 handle_id,
397 worktree.id(),
398 ));
399 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
400 handle_id,
401 worktree.id(),
402 ));
403 this.send_project_updates(cx);
404 })
405 .detach();
406 }
407
408 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
409 self.worktrees.retain(|worktree| {
410 if let Some(worktree) = worktree.upgrade() {
411 if worktree.read(cx).id() == id_to_remove {
412 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
413 worktree.entity_id(),
414 id_to_remove,
415 ));
416 false
417 } else {
418 true
419 }
420 } else {
421 false
422 }
423 });
424 self.send_project_updates(cx);
425 }
426
427 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
428 self.worktrees_reordered = worktrees_reordered;
429 }
430
431 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
432 match &self.state {
433 WorktreeStoreState::Remote {
434 upstream_client,
435 upstream_project_id,
436 ..
437 } => Some((upstream_client.clone(), *upstream_project_id)),
438 WorktreeStoreState::Local { .. } => None,
439 }
440 }
441
442 pub fn set_worktrees_from_proto(
443 &mut self,
444 worktrees: Vec<proto::WorktreeMetadata>,
445 replica_id: ReplicaId,
446 cx: &mut Context<Self>,
447 ) -> Result<()> {
448 let mut old_worktrees_by_id = self
449 .worktrees
450 .drain(..)
451 .filter_map(|worktree| {
452 let worktree = worktree.upgrade()?;
453 Some((worktree.read(cx).id(), worktree))
454 })
455 .collect::<HashMap<_, _>>();
456
457 let (client, project_id) = self
458 .upstream_client()
459 .clone()
460 .ok_or_else(|| anyhow!("invalid project"))?;
461
462 for worktree in worktrees {
463 if let Some(old_worktree) =
464 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
465 {
466 let push_strong_handle =
467 self.retain_worktrees || old_worktree.read(cx).is_visible();
468 let handle = if push_strong_handle {
469 WorktreeHandle::Strong(old_worktree.clone())
470 } else {
471 WorktreeHandle::Weak(old_worktree.downgrade())
472 };
473 self.worktrees.push(handle);
474 } else {
475 self.add(
476 &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
477 cx,
478 );
479 }
480 }
481 self.send_project_updates(cx);
482
483 Ok(())
484 }
485
486 pub fn move_worktree(
487 &mut self,
488 source: WorktreeId,
489 destination: WorktreeId,
490 cx: &mut Context<Self>,
491 ) -> Result<()> {
492 if source == destination {
493 return Ok(());
494 }
495
496 let mut source_index = None;
497 let mut destination_index = None;
498 for (i, worktree) in self.worktrees.iter().enumerate() {
499 if let Some(worktree) = worktree.upgrade() {
500 let worktree_id = worktree.read(cx).id();
501 if worktree_id == source {
502 source_index = Some(i);
503 if destination_index.is_some() {
504 break;
505 }
506 } else if worktree_id == destination {
507 destination_index = Some(i);
508 if source_index.is_some() {
509 break;
510 }
511 }
512 }
513 }
514
515 let source_index =
516 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
517 let destination_index =
518 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
519
520 if source_index == destination_index {
521 return Ok(());
522 }
523
524 let worktree_to_move = self.worktrees.remove(source_index);
525 self.worktrees.insert(destination_index, worktree_to_move);
526 self.worktrees_reordered = true;
527 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
528 cx.notify();
529 Ok(())
530 }
531
532 pub fn disconnected_from_host(&mut self, cx: &mut App) {
533 for worktree in &self.worktrees {
534 if let Some(worktree) = worktree.upgrade() {
535 worktree.update(cx, |worktree, _| {
536 if let Some(worktree) = worktree.as_remote_mut() {
537 worktree.disconnected_from_host();
538 }
539 });
540 }
541 }
542 }
543
544 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
545 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
546 return;
547 };
548
549 let update = proto::UpdateProject {
550 project_id,
551 worktrees: self.worktree_metadata_protos(cx),
552 };
553
554 // collab has bad concurrency guarantees, so we send requests in serial.
555 let update_project = if downstream_client.is_via_collab() {
556 Some(downstream_client.request(update))
557 } else {
558 downstream_client.send(update).log_err();
559 None
560 };
561 cx.spawn(|this, mut cx| async move {
562 if let Some(update_project) = update_project {
563 update_project.await?;
564 }
565
566 this.update(&mut cx, |this, cx| {
567 let worktrees = this.worktrees().collect::<Vec<_>>();
568
569 for worktree in worktrees {
570 worktree.update(cx, |worktree, cx| {
571 let client = downstream_client.clone();
572 worktree.observe_updates(project_id, cx, {
573 move |update| {
574 let client = client.clone();
575 async move {
576 if client.is_via_collab() {
577 client
578 .request(update)
579 .map(|result| result.log_err().is_some())
580 .await
581 } else {
582 client.send(update).log_err().is_some()
583 }
584 }
585 }
586 });
587 });
588
589 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
590 }
591
592 anyhow::Ok(())
593 })
594 })
595 .detach_and_log_err(cx);
596 }
597
598 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
599 self.worktrees()
600 .map(|worktree| {
601 let worktree = worktree.read(cx);
602 proto::WorktreeMetadata {
603 id: worktree.id().to_proto(),
604 root_name: worktree.root_name().into(),
605 visible: worktree.is_visible(),
606 abs_path: worktree.abs_path().to_proto(),
607 }
608 })
609 .collect()
610 }
611
612 pub fn shared(
613 &mut self,
614 remote_id: u64,
615 downstream_client: AnyProtoClient,
616 cx: &mut Context<Self>,
617 ) {
618 self.retain_worktrees = true;
619 self.downstream_client = Some((downstream_client, remote_id));
620
621 // When shared, retain all worktrees
622 for worktree_handle in self.worktrees.iter_mut() {
623 match worktree_handle {
624 WorktreeHandle::Strong(_) => {}
625 WorktreeHandle::Weak(worktree) => {
626 if let Some(worktree) = worktree.upgrade() {
627 *worktree_handle = WorktreeHandle::Strong(worktree);
628 }
629 }
630 }
631 }
632 self.send_project_updates(cx);
633 }
634
635 pub fn unshared(&mut self, cx: &mut Context<Self>) {
636 self.retain_worktrees = false;
637 self.downstream_client.take();
638
639 // When not shared, only retain the visible worktrees
640 for worktree_handle in self.worktrees.iter_mut() {
641 if let WorktreeHandle::Strong(worktree) = worktree_handle {
642 let is_visible = worktree.update(cx, |worktree, _| {
643 worktree.stop_observing_updates();
644 worktree.is_visible()
645 });
646 if !is_visible {
647 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
648 }
649 }
650 }
651 }
652
653 /// search over all worktrees and return buffers that *might* match the search.
654 pub fn find_search_candidates(
655 &self,
656 query: SearchQuery,
657 limit: usize,
658 open_entries: HashSet<ProjectEntryId>,
659 fs: Arc<dyn Fs>,
660 cx: &Context<Self>,
661 ) -> Receiver<ProjectPath> {
662 let snapshots = self
663 .visible_worktrees(cx)
664 .filter_map(|tree| {
665 let tree = tree.read(cx);
666 Some((tree.snapshot(), tree.as_local()?.settings()))
667 })
668 .collect::<Vec<_>>();
669
670 let executor = cx.background_executor().clone();
671
672 // We want to return entries in the order they are in the worktrees, so we have one
673 // thread that iterates over the worktrees (and ignored directories) as necessary,
674 // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
675 // channel.
676 // We spawn a number of workers that take items from the filter channel and check the query
677 // against the version of the file on disk.
678 let (filter_tx, filter_rx) = smol::channel::bounded(64);
679 let (output_tx, output_rx) = smol::channel::bounded(64);
680 let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
681
682 let input = cx.background_executor().spawn({
683 let fs = fs.clone();
684 let query = query.clone();
685 async move {
686 Self::find_candidate_paths(
687 fs,
688 snapshots,
689 open_entries,
690 query,
691 filter_tx,
692 output_tx,
693 )
694 .await
695 .log_err();
696 }
697 });
698 const MAX_CONCURRENT_FILE_SCANS: usize = 64;
699 let filters = cx.background_executor().spawn(async move {
700 let fs = &fs;
701 let query = &query;
702 executor
703 .scoped(move |scope| {
704 for _ in 0..MAX_CONCURRENT_FILE_SCANS {
705 let filter_rx = filter_rx.clone();
706 scope.spawn(async move {
707 Self::filter_paths(fs, filter_rx, query)
708 .await
709 .log_with_level(log::Level::Debug);
710 })
711 }
712 })
713 .await;
714 });
715 cx.background_executor()
716 .spawn(async move {
717 let mut matched = 0;
718 while let Ok(mut receiver) = output_rx.recv().await {
719 let Some(path) = receiver.next().await else {
720 continue;
721 };
722 let Ok(_) = matching_paths_tx.send(path).await else {
723 break;
724 };
725 matched += 1;
726 if matched == limit {
727 break;
728 }
729 }
730 drop(input);
731 drop(filters);
732 })
733 .detach();
734 matching_paths_rx
735 }
736
737 fn scan_ignored_dir<'a>(
738 fs: &'a Arc<dyn Fs>,
739 snapshot: &'a worktree::Snapshot,
740 path: &'a Path,
741 query: &'a SearchQuery,
742 include_root: bool,
743 filter_tx: &'a Sender<MatchingEntry>,
744 output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
745 ) -> BoxFuture<'a, Result<()>> {
746 async move {
747 let abs_path = snapshot.abs_path().join(path);
748 let Some(mut files) = fs
749 .read_dir(&abs_path)
750 .await
751 .with_context(|| format!("listing ignored path {abs_path:?}"))
752 .log_err()
753 else {
754 return Ok(());
755 };
756
757 let mut results = Vec::new();
758
759 while let Some(Ok(file)) = files.next().await {
760 let Some(metadata) = fs
761 .metadata(&file)
762 .await
763 .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
764 .log_err()
765 .flatten()
766 else {
767 continue;
768 };
769 if metadata.is_symlink || metadata.is_fifo {
770 continue;
771 }
772 results.push((
773 file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
774 !metadata.is_dir,
775 ))
776 }
777 results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
778 for (path, is_file) in results {
779 if is_file {
780 if query.filters_path() {
781 let matched_path = if include_root {
782 let mut full_path = PathBuf::from(snapshot.root_name());
783 full_path.push(&path);
784 query.file_matches(&full_path)
785 } else {
786 query.file_matches(&path)
787 };
788 if !matched_path {
789 continue;
790 }
791 }
792 let (tx, rx) = oneshot::channel();
793 output_tx.send(rx).await?;
794 filter_tx
795 .send(MatchingEntry {
796 respond: tx,
797 worktree_path: snapshot.abs_path().clone(),
798 path: ProjectPath {
799 worktree_id: snapshot.id(),
800 path: Arc::from(path),
801 },
802 })
803 .await?;
804 } else {
805 Self::scan_ignored_dir(
806 fs,
807 snapshot,
808 &path,
809 query,
810 include_root,
811 filter_tx,
812 output_tx,
813 )
814 .await?;
815 }
816 }
817 Ok(())
818 }
819 .boxed()
820 }
821
822 async fn find_candidate_paths(
823 fs: Arc<dyn Fs>,
824 snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
825 open_entries: HashSet<ProjectEntryId>,
826 query: SearchQuery,
827 filter_tx: Sender<MatchingEntry>,
828 output_tx: Sender<oneshot::Receiver<ProjectPath>>,
829 ) -> Result<()> {
830 let include_root = snapshots.len() > 1;
831 for (snapshot, settings) in snapshots {
832 for entry in snapshot.entries(query.include_ignored(), 0) {
833 if entry.is_dir() && entry.is_ignored {
834 if !settings.is_path_excluded(&entry.path) {
835 Self::scan_ignored_dir(
836 &fs,
837 &snapshot,
838 &entry.path,
839 &query,
840 include_root,
841 &filter_tx,
842 &output_tx,
843 )
844 .await?;
845 }
846 continue;
847 }
848
849 if entry.is_fifo || !entry.is_file() {
850 continue;
851 }
852
853 if query.filters_path() {
854 let matched_path = if include_root {
855 let mut full_path = PathBuf::from(snapshot.root_name());
856 full_path.push(&entry.path);
857 query.file_matches(&full_path)
858 } else {
859 query.file_matches(&entry.path)
860 };
861 if !matched_path {
862 continue;
863 }
864 }
865
866 let (mut tx, rx) = oneshot::channel();
867
868 if open_entries.contains(&entry.id) {
869 tx.send(ProjectPath {
870 worktree_id: snapshot.id(),
871 path: entry.path.clone(),
872 })
873 .await?;
874 } else {
875 filter_tx
876 .send(MatchingEntry {
877 respond: tx,
878 worktree_path: snapshot.abs_path().clone(),
879 path: ProjectPath {
880 worktree_id: snapshot.id(),
881 path: entry.path.clone(),
882 },
883 })
884 .await?;
885 }
886
887 output_tx.send(rx).await?;
888 }
889 }
890 Ok(())
891 }
892
893 pub fn branches(
894 &self,
895 project_path: ProjectPath,
896 cx: &App,
897 ) -> Task<Result<Vec<git::repository::Branch>>> {
898 let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
899 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
900 };
901
902 match worktree.read(cx) {
903 Worktree::Local(local_worktree) => {
904 let branches = util::maybe!({
905 let worktree_error = |error| {
906 format!(
907 "{} for worktree {}",
908 error,
909 local_worktree.abs_path().to_string_lossy()
910 )
911 };
912
913 let entry = local_worktree
914 .git_entry(project_path.path)
915 .with_context(|| worktree_error("No git entry found"))?;
916
917 let repo = local_worktree
918 .get_local_repo(&entry)
919 .with_context(|| worktree_error("No repository found"))?
920 .repo()
921 .clone();
922
923 repo.branches()
924 });
925
926 Task::ready(branches)
927 }
928 Worktree::Remote(remote_worktree) => {
929 let request = remote_worktree.client().request(proto::GitBranches {
930 project_id: remote_worktree.project_id(),
931 repository: Some(proto::ProjectPath {
932 worktree_id: project_path.worktree_id.to_proto(),
933 path: project_path.path.to_proto(), // Root path
934 }),
935 });
936
937 cx.background_executor().spawn(async move {
938 let response = request.await?;
939
940 let branches = response
941 .branches
942 .into_iter()
943 .map(|proto_branch| git::repository::Branch {
944 is_head: proto_branch.is_head,
945 name: proto_branch.name.into(),
946 upstream: proto_branch.upstream.map(|upstream| {
947 git::repository::Upstream {
948 ref_name: upstream.ref_name.into(),
949 tracking: upstream.tracking.map(|tracking| {
950 git::repository::UpstreamTracking {
951 ahead: tracking.ahead as u32,
952 behind: tracking.behind as u32,
953 }
954 }),
955 }
956 }),
957 most_recent_commit: proto_branch.most_recent_commit.map(|commit| {
958 git::repository::CommitSummary {
959 sha: commit.sha.into(),
960 subject: commit.subject.into(),
961 commit_timestamp: commit.commit_timestamp,
962 }
963 }),
964 })
965 .collect();
966
967 Ok(branches)
968 })
969 }
970 }
971 }
972
973 pub fn update_or_create_branch(
974 &self,
975 repository: ProjectPath,
976 new_branch: String,
977 cx: &App,
978 ) -> Task<Result<()>> {
979 let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
980 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
981 };
982
983 match worktree.read(cx) {
984 Worktree::Local(local_worktree) => {
985 let result = util::maybe!({
986 let worktree_error = |error| {
987 format!(
988 "{} for worktree {}",
989 error,
990 local_worktree.abs_path().to_string_lossy()
991 )
992 };
993
994 let entry = local_worktree
995 .git_entry(repository.path)
996 .with_context(|| worktree_error("No git entry found"))?;
997
998 let repo = local_worktree
999 .get_local_repo(&entry)
1000 .with_context(|| worktree_error("No repository found"))?
1001 .repo()
1002 .clone();
1003
1004 if !repo.branch_exits(&new_branch)? {
1005 repo.create_branch(&new_branch)?;
1006 }
1007
1008 repo.change_branch(&new_branch)?;
1009 Ok(())
1010 });
1011
1012 Task::ready(result)
1013 }
1014 Worktree::Remote(remote_worktree) => {
1015 let request = remote_worktree.client().request(proto::UpdateGitBranch {
1016 project_id: remote_worktree.project_id(),
1017 repository: Some(proto::ProjectPath {
1018 worktree_id: repository.worktree_id.to_proto(),
1019 path: repository.path.to_proto(), // Root path
1020 }),
1021 branch_name: new_branch,
1022 });
1023
1024 cx.background_executor().spawn(async move {
1025 request.await?;
1026 Ok(())
1027 })
1028 }
1029 }
1030 }
1031
1032 async fn filter_paths(
1033 fs: &Arc<dyn Fs>,
1034 mut input: Receiver<MatchingEntry>,
1035 query: &SearchQuery,
1036 ) -> Result<()> {
1037 let mut input = pin!(input);
1038 while let Some(mut entry) = input.next().await {
1039 let abs_path = entry.worktree_path.join(&entry.path.path);
1040 let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1041 continue;
1042 };
1043
1044 let mut file = BufReader::new(file);
1045 let file_start = file.fill_buf()?;
1046
1047 if let Err(Some(starting_position)) =
1048 std::str::from_utf8(file_start).map_err(|e| e.error_len())
1049 {
1050 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1051 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1052 log::debug!(
1053 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1054 );
1055 continue;
1056 }
1057
1058 if query.detect(file).unwrap_or(false) {
1059 entry.respond.send(entry.path).await?
1060 }
1061 }
1062
1063 Ok(())
1064 }
1065
1066 pub async fn handle_create_project_entry(
1067 this: Entity<Self>,
1068 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1069 mut cx: AsyncApp,
1070 ) -> Result<proto::ProjectEntryResponse> {
1071 let worktree = this.update(&mut cx, |this, cx| {
1072 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1073 this.worktree_for_id(worktree_id, cx)
1074 .ok_or_else(|| anyhow!("worktree not found"))
1075 })??;
1076 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1077 }
1078
1079 pub async fn handle_copy_project_entry(
1080 this: Entity<Self>,
1081 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1082 mut cx: AsyncApp,
1083 ) -> Result<proto::ProjectEntryResponse> {
1084 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1085 let worktree = this.update(&mut cx, |this, cx| {
1086 this.worktree_for_entry(entry_id, cx)
1087 .ok_or_else(|| anyhow!("worktree not found"))
1088 })??;
1089 Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1090 }
1091
1092 pub async fn handle_delete_project_entry(
1093 this: Entity<Self>,
1094 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1095 mut cx: AsyncApp,
1096 ) -> Result<proto::ProjectEntryResponse> {
1097 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1098 let worktree = this.update(&mut cx, |this, cx| {
1099 this.worktree_for_entry(entry_id, cx)
1100 .ok_or_else(|| anyhow!("worktree not found"))
1101 })??;
1102 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1103 }
1104
1105 pub async fn handle_expand_project_entry(
1106 this: Entity<Self>,
1107 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1108 mut cx: AsyncApp,
1109 ) -> Result<proto::ExpandProjectEntryResponse> {
1110 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1111 let worktree = this
1112 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1113 .ok_or_else(|| anyhow!("invalid request"))?;
1114 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1115 }
1116
1117 pub async fn handle_expand_all_for_project_entry(
1118 this: Entity<Self>,
1119 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1120 mut cx: AsyncApp,
1121 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1122 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1123 let worktree = this
1124 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1125 .ok_or_else(|| anyhow!("invalid request"))?;
1126 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1127 }
1128
1129 pub async fn handle_git_branches(
1130 this: Entity<Self>,
1131 branches: TypedEnvelope<proto::GitBranches>,
1132 cx: AsyncApp,
1133 ) -> Result<proto::GitBranchesResponse> {
1134 let project_path = branches
1135 .payload
1136 .repository
1137 .clone()
1138 .context("Invalid GitBranches call")?;
1139 let project_path = ProjectPath {
1140 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1141 path: Arc::<Path>::from_proto(project_path.path),
1142 };
1143
1144 let branches = this
1145 .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1146 .await?;
1147
1148 Ok(proto::GitBranchesResponse {
1149 branches: branches.iter().map(branch_to_proto).collect(),
1150 })
1151 }
1152
1153 pub async fn handle_update_branch(
1154 this: Entity<Self>,
1155 update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1156 cx: AsyncApp,
1157 ) -> Result<proto::Ack> {
1158 let project_path = update_branch
1159 .payload
1160 .repository
1161 .clone()
1162 .context("Invalid GitBranches call")?;
1163 let project_path = ProjectPath {
1164 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1165 path: Arc::<Path>::from_proto(project_path.path),
1166 };
1167 let new_branch = update_branch.payload.branch_name;
1168
1169 this.read_with(&cx, |this, cx| {
1170 this.update_or_create_branch(project_path, new_branch, cx)
1171 })?
1172 .await?;
1173
1174 Ok(proto::Ack {})
1175 }
1176}
1177
1178#[derive(Clone, Debug)]
1179enum WorktreeHandle {
1180 Strong(Entity<Worktree>),
1181 Weak(WeakEntity<Worktree>),
1182}
1183
1184impl WorktreeHandle {
1185 fn upgrade(&self) -> Option<Entity<Worktree>> {
1186 match self {
1187 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1188 WorktreeHandle::Weak(handle) => handle.upgrade(),
1189 }
1190 }
1191}