1use std::{
2 io::{BufRead, BufReader},
3 path::{Path, PathBuf},
4 pin::pin,
5 sync::{atomic::AtomicUsize, Arc},
6};
7
8use anyhow::{anyhow, Context as _, Result};
9use collections::{HashMap, HashSet};
10use fs::Fs;
11use futures::{
12 future::{BoxFuture, Shared},
13 FutureExt, SinkExt,
14};
15use git::repository::Branch;
16use gpui::{
17 App, AppContext as _, AsyncApp, Context, Entity, EntityId, EventEmitter, Task, WeakEntity,
18};
19use postage::oneshot;
20use rpc::{
21 proto::{self, FromProto, ToProto, SSH_PROJECT_ID},
22 AnyProtoClient, ErrorExt, TypedEnvelope,
23};
24use smol::{
25 channel::{Receiver, Sender},
26 stream::StreamExt,
27};
28use text::ReplicaId;
29use util::{paths::SanitizedPath, ResultExt};
30use worktree::{
31 branch_to_proto, Entry, ProjectEntryId, UpdatedEntriesSet, Worktree, WorktreeId,
32 WorktreeSettings,
33};
34
35use crate::{search::SearchQuery, ProjectPath};
36
37struct MatchingEntry {
38 worktree_path: Arc<Path>,
39 path: ProjectPath,
40 respond: oneshot::Sender<ProjectPath>,
41}
42
43enum WorktreeStoreState {
44 Local {
45 fs: Arc<dyn Fs>,
46 },
47 Remote {
48 upstream_client: AnyProtoClient,
49 upstream_project_id: u64,
50 },
51}
52
53pub struct WorktreeStore {
54 next_entry_id: Arc<AtomicUsize>,
55 downstream_client: Option<(AnyProtoClient, u64)>,
56 retain_worktrees: bool,
57 worktrees: Vec<WorktreeHandle>,
58 worktrees_reordered: bool,
59 #[allow(clippy::type_complexity)]
60 loading_worktrees:
61 HashMap<SanitizedPath, Shared<Task<Result<Entity<Worktree>, Arc<anyhow::Error>>>>>,
62 state: WorktreeStoreState,
63}
64
65#[derive(Debug)]
66pub enum WorktreeStoreEvent {
67 WorktreeAdded(Entity<Worktree>),
68 WorktreeRemoved(EntityId, WorktreeId),
69 WorktreeReleased(EntityId, WorktreeId),
70 WorktreeOrderChanged,
71 WorktreeUpdateSent(Entity<Worktree>),
72 WorktreeUpdatedEntries(WorktreeId, UpdatedEntriesSet),
73 WorktreeUpdatedGitRepositories(WorktreeId),
74 WorktreeDeletedEntry(WorktreeId, ProjectEntryId),
75}
76
77impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
78
79impl WorktreeStore {
80 pub fn init(client: &AnyProtoClient) {
81 client.add_entity_request_handler(Self::handle_create_project_entry);
82 client.add_entity_request_handler(Self::handle_copy_project_entry);
83 client.add_entity_request_handler(Self::handle_delete_project_entry);
84 client.add_entity_request_handler(Self::handle_expand_project_entry);
85 client.add_entity_request_handler(Self::handle_expand_all_for_project_entry);
86 client.add_entity_request_handler(Self::handle_git_branches);
87 client.add_entity_request_handler(Self::handle_update_branch);
88 }
89
90 pub fn local(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
91 Self {
92 next_entry_id: Default::default(),
93 loading_worktrees: Default::default(),
94 downstream_client: None,
95 worktrees: Vec::new(),
96 worktrees_reordered: false,
97 retain_worktrees,
98 state: WorktreeStoreState::Local { fs },
99 }
100 }
101
102 pub fn remote(
103 retain_worktrees: bool,
104 upstream_client: AnyProtoClient,
105 upstream_project_id: u64,
106 ) -> Self {
107 Self {
108 next_entry_id: Default::default(),
109 loading_worktrees: Default::default(),
110 downstream_client: None,
111 worktrees: Vec::new(),
112 worktrees_reordered: false,
113 retain_worktrees,
114 state: WorktreeStoreState::Remote {
115 upstream_client,
116 upstream_project_id,
117 },
118 }
119 }
120
121 /// Iterates through all worktrees, including ones that don't appear in the project panel
122 pub fn worktrees(&self) -> impl '_ + DoubleEndedIterator<Item = Entity<Worktree>> {
123 self.worktrees
124 .iter()
125 .filter_map(move |worktree| worktree.upgrade())
126 }
127
128 /// Iterates through all user-visible worktrees, the ones that appear in the project panel.
129 pub fn visible_worktrees<'a>(
130 &'a self,
131 cx: &'a App,
132 ) -> impl 'a + DoubleEndedIterator<Item = Entity<Worktree>> {
133 self.worktrees()
134 .filter(|worktree| worktree.read(cx).is_visible())
135 }
136
137 pub fn worktree_for_id(&self, id: WorktreeId, cx: &App) -> Option<Entity<Worktree>> {
138 self.worktrees()
139 .find(|worktree| worktree.read(cx).id() == id)
140 }
141
142 pub fn current_branch(&self, repository: ProjectPath, cx: &App) -> Option<Branch> {
143 self.worktree_for_id(repository.worktree_id, cx)?
144 .read(cx)
145 .git_entry(repository.path)?
146 .branch()
147 .cloned()
148 }
149
150 pub fn worktree_for_entry(
151 &self,
152 entry_id: ProjectEntryId,
153 cx: &App,
154 ) -> Option<Entity<Worktree>> {
155 self.worktrees()
156 .find(|worktree| worktree.read(cx).contains_entry(entry_id))
157 }
158
159 pub fn find_worktree(
160 &self,
161 abs_path: impl Into<SanitizedPath>,
162 cx: &App,
163 ) -> Option<(Entity<Worktree>, PathBuf)> {
164 let abs_path: SanitizedPath = abs_path.into();
165 for tree in self.worktrees() {
166 if let Ok(relative_path) = abs_path.as_path().strip_prefix(tree.read(cx).abs_path()) {
167 return Some((tree.clone(), relative_path.into()));
168 }
169 }
170 None
171 }
172
173 pub fn find_or_create_worktree(
174 &mut self,
175 abs_path: impl AsRef<Path>,
176 visible: bool,
177 cx: &mut Context<Self>,
178 ) -> Task<Result<(Entity<Worktree>, PathBuf)>> {
179 let abs_path = abs_path.as_ref();
180 if let Some((tree, relative_path)) = self.find_worktree(abs_path, cx) {
181 Task::ready(Ok((tree, relative_path)))
182 } else {
183 let worktree = self.create_worktree(abs_path, visible, cx);
184 cx.background_spawn(async move { Ok((worktree.await?, PathBuf::new())) })
185 }
186 }
187
188 pub fn entry_for_id<'a>(&'a self, entry_id: ProjectEntryId, cx: &'a App) -> Option<&'a Entry> {
189 self.worktrees()
190 .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id))
191 }
192
193 pub fn worktree_and_entry_for_id<'a>(
194 &'a self,
195 entry_id: ProjectEntryId,
196 cx: &'a App,
197 ) -> Option<(Entity<Worktree>, &'a Entry)> {
198 self.worktrees().find_map(|worktree| {
199 worktree
200 .read(cx)
201 .entry_for_id(entry_id)
202 .map(|e| (worktree.clone(), e))
203 })
204 }
205
206 pub fn entry_for_path(&self, path: &ProjectPath, cx: &App) -> Option<Entry> {
207 self.worktree_for_id(path.worktree_id, cx)?
208 .read(cx)
209 .entry_for_path(&path.path)
210 .cloned()
211 }
212
213 pub fn create_worktree(
214 &mut self,
215 abs_path: impl Into<SanitizedPath>,
216 visible: bool,
217 cx: &mut Context<Self>,
218 ) -> Task<Result<Entity<Worktree>>> {
219 let abs_path: SanitizedPath = abs_path.into();
220 if !self.loading_worktrees.contains_key(&abs_path) {
221 let task = match &self.state {
222 WorktreeStoreState::Remote {
223 upstream_client, ..
224 } => {
225 if upstream_client.is_via_collab() {
226 Task::ready(Err(Arc::new(anyhow!("cannot create worktrees via collab"))))
227 } else {
228 self.create_ssh_worktree(
229 upstream_client.clone(),
230 abs_path.clone(),
231 visible,
232 cx,
233 )
234 }
235 }
236 WorktreeStoreState::Local { fs } => {
237 self.create_local_worktree(fs.clone(), abs_path.clone(), visible, cx)
238 }
239 };
240
241 self.loading_worktrees
242 .insert(abs_path.clone(), task.shared());
243 }
244 let task = self.loading_worktrees.get(&abs_path).unwrap().clone();
245 cx.spawn(|this, mut cx| async move {
246 let result = task.await;
247 this.update(&mut cx, |this, _| this.loading_worktrees.remove(&abs_path))
248 .ok();
249 match result {
250 Ok(worktree) => Ok(worktree),
251 Err(err) => Err((*err).cloned()),
252 }
253 })
254 }
255
256 fn create_ssh_worktree(
257 &mut self,
258 client: AnyProtoClient,
259 abs_path: impl Into<SanitizedPath>,
260 visible: bool,
261 cx: &mut Context<Self>,
262 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
263 let mut abs_path = Into::<SanitizedPath>::into(abs_path).to_string();
264 // If we start with `/~` that means the ssh path was something like `ssh://user@host/~/home-dir-folder/`
265 // in which case want to strip the leading the `/`.
266 // On the host-side, the `~` will get expanded.
267 // That's what git does too: https://github.com/libgit2/libgit2/issues/3345#issuecomment-127050850
268 if abs_path.starts_with("/~") {
269 abs_path = abs_path[1..].to_string();
270 }
271 if abs_path.is_empty() || abs_path == "/" {
272 abs_path = "~/".to_string();
273 }
274 cx.spawn(|this, mut cx| async move {
275 let this = this.upgrade().context("Dropped worktree store")?;
276
277 let path = Path::new(abs_path.as_str());
278 let response = client
279 .request(proto::AddWorktree {
280 project_id: SSH_PROJECT_ID,
281 path: path.to_proto(),
282 visible,
283 })
284 .await?;
285
286 if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
287 this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
288 })? {
289 return Ok(existing_worktree);
290 }
291
292 let root_path_buf = PathBuf::from_proto(response.canonicalized_path.clone());
293 let root_name = root_path_buf
294 .file_name()
295 .map(|n| n.to_string_lossy().to_string())
296 .unwrap_or(root_path_buf.to_string_lossy().to_string());
297
298 let worktree = cx.update(|cx| {
299 Worktree::remote(
300 SSH_PROJECT_ID,
301 0,
302 proto::WorktreeMetadata {
303 id: response.worktree_id,
304 root_name,
305 visible,
306 abs_path: response.canonicalized_path,
307 },
308 client,
309 cx,
310 )
311 })?;
312
313 this.update(&mut cx, |this, cx| {
314 this.add(&worktree, cx);
315 })?;
316 Ok(worktree)
317 })
318 }
319
320 fn create_local_worktree(
321 &mut self,
322 fs: Arc<dyn Fs>,
323 abs_path: impl Into<SanitizedPath>,
324 visible: bool,
325 cx: &mut Context<Self>,
326 ) -> Task<Result<Entity<Worktree>, Arc<anyhow::Error>>> {
327 let next_entry_id = self.next_entry_id.clone();
328 let path: SanitizedPath = abs_path.into();
329
330 cx.spawn(move |this, mut cx| async move {
331 let worktree = Worktree::local(path.clone(), visible, fs, next_entry_id, &mut cx).await;
332
333 let worktree = worktree?;
334
335 this.update(&mut cx, |this, cx| this.add(&worktree, cx))?;
336
337 if visible {
338 cx.update(|cx| {
339 cx.add_recent_document(path.as_path());
340 })
341 .log_err();
342 }
343
344 Ok(worktree)
345 })
346 }
347
348 pub fn add(&mut self, worktree: &Entity<Worktree>, cx: &mut Context<Self>) {
349 let worktree_id = worktree.read(cx).id();
350 debug_assert!(self.worktrees().all(|w| w.read(cx).id() != worktree_id));
351
352 let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
353 let handle = if push_strong_handle {
354 WorktreeHandle::Strong(worktree.clone())
355 } else {
356 WorktreeHandle::Weak(worktree.downgrade())
357 };
358 if self.worktrees_reordered {
359 self.worktrees.push(handle);
360 } else {
361 let i = match self
362 .worktrees
363 .binary_search_by_key(&Some(worktree.read(cx).abs_path()), |other| {
364 other.upgrade().map(|worktree| worktree.read(cx).abs_path())
365 }) {
366 Ok(i) | Err(i) => i,
367 };
368 self.worktrees.insert(i, handle);
369 }
370
371 cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
372 self.send_project_updates(cx);
373
374 let handle_id = worktree.entity_id();
375 cx.subscribe(worktree, |_, worktree, event, cx| {
376 let worktree_id = worktree.update(cx, |worktree, _| worktree.id());
377 match event {
378 worktree::Event::UpdatedEntries(changes) => {
379 cx.emit(WorktreeStoreEvent::WorktreeUpdatedEntries(
380 worktree_id,
381 changes.clone(),
382 ));
383 }
384 worktree::Event::UpdatedGitRepositories(_) => {
385 cx.emit(WorktreeStoreEvent::WorktreeUpdatedGitRepositories(
386 worktree_id,
387 ));
388 }
389 worktree::Event::DeletedEntry(id) => {
390 cx.emit(WorktreeStoreEvent::WorktreeDeletedEntry(worktree_id, *id))
391 }
392 }
393 })
394 .detach();
395 cx.observe_release(worktree, move |this, worktree, cx| {
396 cx.emit(WorktreeStoreEvent::WorktreeReleased(
397 handle_id,
398 worktree.id(),
399 ));
400 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
401 handle_id,
402 worktree.id(),
403 ));
404 this.send_project_updates(cx);
405 })
406 .detach();
407 }
408
409 pub fn remove_worktree(&mut self, id_to_remove: WorktreeId, cx: &mut Context<Self>) {
410 self.worktrees.retain(|worktree| {
411 if let Some(worktree) = worktree.upgrade() {
412 if worktree.read(cx).id() == id_to_remove {
413 cx.emit(WorktreeStoreEvent::WorktreeRemoved(
414 worktree.entity_id(),
415 id_to_remove,
416 ));
417 false
418 } else {
419 true
420 }
421 } else {
422 false
423 }
424 });
425 self.send_project_updates(cx);
426 }
427
428 pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
429 self.worktrees_reordered = worktrees_reordered;
430 }
431
432 fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
433 match &self.state {
434 WorktreeStoreState::Remote {
435 upstream_client,
436 upstream_project_id,
437 ..
438 } => Some((upstream_client.clone(), *upstream_project_id)),
439 WorktreeStoreState::Local { .. } => None,
440 }
441 }
442
443 pub fn set_worktrees_from_proto(
444 &mut self,
445 worktrees: Vec<proto::WorktreeMetadata>,
446 replica_id: ReplicaId,
447 cx: &mut Context<Self>,
448 ) -> Result<()> {
449 let mut old_worktrees_by_id = self
450 .worktrees
451 .drain(..)
452 .filter_map(|worktree| {
453 let worktree = worktree.upgrade()?;
454 Some((worktree.read(cx).id(), worktree))
455 })
456 .collect::<HashMap<_, _>>();
457
458 let (client, project_id) = self
459 .upstream_client()
460 .clone()
461 .ok_or_else(|| anyhow!("invalid project"))?;
462
463 for worktree in worktrees {
464 if let Some(old_worktree) =
465 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
466 {
467 let push_strong_handle =
468 self.retain_worktrees || old_worktree.read(cx).is_visible();
469 let handle = if push_strong_handle {
470 WorktreeHandle::Strong(old_worktree.clone())
471 } else {
472 WorktreeHandle::Weak(old_worktree.downgrade())
473 };
474 self.worktrees.push(handle);
475 } else {
476 self.add(
477 &Worktree::remote(project_id, replica_id, worktree, client.clone(), cx),
478 cx,
479 );
480 }
481 }
482 self.send_project_updates(cx);
483
484 Ok(())
485 }
486
487 pub fn move_worktree(
488 &mut self,
489 source: WorktreeId,
490 destination: WorktreeId,
491 cx: &mut Context<Self>,
492 ) -> Result<()> {
493 if source == destination {
494 return Ok(());
495 }
496
497 let mut source_index = None;
498 let mut destination_index = None;
499 for (i, worktree) in self.worktrees.iter().enumerate() {
500 if let Some(worktree) = worktree.upgrade() {
501 let worktree_id = worktree.read(cx).id();
502 if worktree_id == source {
503 source_index = Some(i);
504 if destination_index.is_some() {
505 break;
506 }
507 } else if worktree_id == destination {
508 destination_index = Some(i);
509 if source_index.is_some() {
510 break;
511 }
512 }
513 }
514 }
515
516 let source_index =
517 source_index.with_context(|| format!("Missing worktree for id {source}"))?;
518 let destination_index =
519 destination_index.with_context(|| format!("Missing worktree for id {destination}"))?;
520
521 if source_index == destination_index {
522 return Ok(());
523 }
524
525 let worktree_to_move = self.worktrees.remove(source_index);
526 self.worktrees.insert(destination_index, worktree_to_move);
527 self.worktrees_reordered = true;
528 cx.emit(WorktreeStoreEvent::WorktreeOrderChanged);
529 cx.notify();
530 Ok(())
531 }
532
533 pub fn disconnected_from_host(&mut self, cx: &mut App) {
534 for worktree in &self.worktrees {
535 if let Some(worktree) = worktree.upgrade() {
536 worktree.update(cx, |worktree, _| {
537 if let Some(worktree) = worktree.as_remote_mut() {
538 worktree.disconnected_from_host();
539 }
540 });
541 }
542 }
543 }
544
545 pub fn send_project_updates(&mut self, cx: &mut Context<Self>) {
546 let Some((downstream_client, project_id)) = self.downstream_client.clone() else {
547 return;
548 };
549
550 let update = proto::UpdateProject {
551 project_id,
552 worktrees: self.worktree_metadata_protos(cx),
553 };
554
555 // collab has bad concurrency guarantees, so we send requests in serial.
556 let update_project = if downstream_client.is_via_collab() {
557 Some(downstream_client.request(update))
558 } else {
559 downstream_client.send(update).log_err();
560 None
561 };
562 cx.spawn(|this, mut cx| async move {
563 if let Some(update_project) = update_project {
564 update_project.await?;
565 }
566
567 this.update(&mut cx, |this, cx| {
568 let worktrees = this.worktrees().collect::<Vec<_>>();
569
570 for worktree in worktrees {
571 worktree.update(cx, |worktree, cx| {
572 let client = downstream_client.clone();
573 worktree.observe_updates(project_id, cx, {
574 move |update| {
575 let client = client.clone();
576 async move {
577 if client.is_via_collab() {
578 client
579 .request(update)
580 .map(|result| result.log_err().is_some())
581 .await
582 } else {
583 client.send(update).log_err().is_some()
584 }
585 }
586 }
587 });
588 });
589
590 cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
591 }
592
593 anyhow::Ok(())
594 })
595 })
596 .detach_and_log_err(cx);
597 }
598
599 pub fn worktree_metadata_protos(&self, cx: &App) -> Vec<proto::WorktreeMetadata> {
600 self.worktrees()
601 .map(|worktree| {
602 let worktree = worktree.read(cx);
603 proto::WorktreeMetadata {
604 id: worktree.id().to_proto(),
605 root_name: worktree.root_name().into(),
606 visible: worktree.is_visible(),
607 abs_path: worktree.abs_path().to_proto(),
608 }
609 })
610 .collect()
611 }
612
613 pub fn shared(
614 &mut self,
615 remote_id: u64,
616 downstream_client: AnyProtoClient,
617 cx: &mut Context<Self>,
618 ) {
619 self.retain_worktrees = true;
620 self.downstream_client = Some((downstream_client, remote_id));
621
622 // When shared, retain all worktrees
623 for worktree_handle in self.worktrees.iter_mut() {
624 match worktree_handle {
625 WorktreeHandle::Strong(_) => {}
626 WorktreeHandle::Weak(worktree) => {
627 if let Some(worktree) = worktree.upgrade() {
628 *worktree_handle = WorktreeHandle::Strong(worktree);
629 }
630 }
631 }
632 }
633 self.send_project_updates(cx);
634 }
635
636 pub fn unshared(&mut self, cx: &mut Context<Self>) {
637 self.retain_worktrees = false;
638 self.downstream_client.take();
639
640 // When not shared, only retain the visible worktrees
641 for worktree_handle in self.worktrees.iter_mut() {
642 if let WorktreeHandle::Strong(worktree) = worktree_handle {
643 let is_visible = worktree.update(cx, |worktree, _| {
644 worktree.stop_observing_updates();
645 worktree.is_visible()
646 });
647 if !is_visible {
648 *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
649 }
650 }
651 }
652 }
653
654 /// search over all worktrees and return buffers that *might* match the search.
655 pub fn find_search_candidates(
656 &self,
657 query: SearchQuery,
658 limit: usize,
659 open_entries: HashSet<ProjectEntryId>,
660 fs: Arc<dyn Fs>,
661 cx: &Context<Self>,
662 ) -> Receiver<ProjectPath> {
663 let snapshots = self
664 .visible_worktrees(cx)
665 .filter_map(|tree| {
666 let tree = tree.read(cx);
667 Some((tree.snapshot(), tree.as_local()?.settings()))
668 })
669 .collect::<Vec<_>>();
670
671 let executor = cx.background_executor().clone();
672
673 // We want to return entries in the order they are in the worktrees, so we have one
674 // thread that iterates over the worktrees (and ignored directories) as necessary,
675 // and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
676 // channel.
677 // We spawn a number of workers that take items from the filter channel and check the query
678 // against the version of the file on disk.
679 let (filter_tx, filter_rx) = smol::channel::bounded(64);
680 let (output_tx, output_rx) = smol::channel::bounded(64);
681 let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
682
683 let input = cx.background_spawn({
684 let fs = fs.clone();
685 let query = query.clone();
686 async move {
687 Self::find_candidate_paths(
688 fs,
689 snapshots,
690 open_entries,
691 query,
692 filter_tx,
693 output_tx,
694 )
695 .await
696 .log_err();
697 }
698 });
699 const MAX_CONCURRENT_FILE_SCANS: usize = 64;
700 let filters = cx.background_spawn(async move {
701 let fs = &fs;
702 let query = &query;
703 executor
704 .scoped(move |scope| {
705 for _ in 0..MAX_CONCURRENT_FILE_SCANS {
706 let filter_rx = filter_rx.clone();
707 scope.spawn(async move {
708 Self::filter_paths(fs, filter_rx, query)
709 .await
710 .log_with_level(log::Level::Debug);
711 })
712 }
713 })
714 .await;
715 });
716 cx.background_spawn(async move {
717 let mut matched = 0;
718 while let Ok(mut receiver) = output_rx.recv().await {
719 let Some(path) = receiver.next().await else {
720 continue;
721 };
722 let Ok(_) = matching_paths_tx.send(path).await else {
723 break;
724 };
725 matched += 1;
726 if matched == limit {
727 break;
728 }
729 }
730 drop(input);
731 drop(filters);
732 })
733 .detach();
734 matching_paths_rx
735 }
736
737 fn scan_ignored_dir<'a>(
738 fs: &'a Arc<dyn Fs>,
739 snapshot: &'a worktree::Snapshot,
740 path: &'a Path,
741 query: &'a SearchQuery,
742 include_root: bool,
743 filter_tx: &'a Sender<MatchingEntry>,
744 output_tx: &'a Sender<oneshot::Receiver<ProjectPath>>,
745 ) -> BoxFuture<'a, Result<()>> {
746 async move {
747 let abs_path = snapshot.abs_path().join(path);
748 let Some(mut files) = fs
749 .read_dir(&abs_path)
750 .await
751 .with_context(|| format!("listing ignored path {abs_path:?}"))
752 .log_err()
753 else {
754 return Ok(());
755 };
756
757 let mut results = Vec::new();
758
759 while let Some(Ok(file)) = files.next().await {
760 let Some(metadata) = fs
761 .metadata(&file)
762 .await
763 .with_context(|| format!("fetching fs metadata for {abs_path:?}"))
764 .log_err()
765 .flatten()
766 else {
767 continue;
768 };
769 if metadata.is_symlink || metadata.is_fifo {
770 continue;
771 }
772 results.push((
773 file.strip_prefix(snapshot.abs_path())?.to_path_buf(),
774 !metadata.is_dir,
775 ))
776 }
777 results.sort_by(|(a_path, _), (b_path, _)| a_path.cmp(b_path));
778 for (path, is_file) in results {
779 if is_file {
780 if query.filters_path() {
781 let matched_path = if include_root {
782 let mut full_path = PathBuf::from(snapshot.root_name());
783 full_path.push(&path);
784 query.file_matches(&full_path)
785 } else {
786 query.file_matches(&path)
787 };
788 if !matched_path {
789 continue;
790 }
791 }
792 let (tx, rx) = oneshot::channel();
793 output_tx.send(rx).await?;
794 filter_tx
795 .send(MatchingEntry {
796 respond: tx,
797 worktree_path: snapshot.abs_path().clone(),
798 path: ProjectPath {
799 worktree_id: snapshot.id(),
800 path: Arc::from(path),
801 },
802 })
803 .await?;
804 } else {
805 Self::scan_ignored_dir(
806 fs,
807 snapshot,
808 &path,
809 query,
810 include_root,
811 filter_tx,
812 output_tx,
813 )
814 .await?;
815 }
816 }
817 Ok(())
818 }
819 .boxed()
820 }
821
822 async fn find_candidate_paths(
823 fs: Arc<dyn Fs>,
824 snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
825 open_entries: HashSet<ProjectEntryId>,
826 query: SearchQuery,
827 filter_tx: Sender<MatchingEntry>,
828 output_tx: Sender<oneshot::Receiver<ProjectPath>>,
829 ) -> Result<()> {
830 let include_root = snapshots.len() > 1;
831 for (snapshot, settings) in snapshots {
832 for entry in snapshot.entries(query.include_ignored(), 0) {
833 if entry.is_dir() && entry.is_ignored {
834 if !settings.is_path_excluded(&entry.path) {
835 Self::scan_ignored_dir(
836 &fs,
837 &snapshot,
838 &entry.path,
839 &query,
840 include_root,
841 &filter_tx,
842 &output_tx,
843 )
844 .await?;
845 }
846 continue;
847 }
848
849 if entry.is_fifo || !entry.is_file() {
850 continue;
851 }
852
853 if query.filters_path() {
854 let matched_path = if include_root {
855 let mut full_path = PathBuf::from(snapshot.root_name());
856 full_path.push(&entry.path);
857 query.file_matches(&full_path)
858 } else {
859 query.file_matches(&entry.path)
860 };
861 if !matched_path {
862 continue;
863 }
864 }
865
866 let (mut tx, rx) = oneshot::channel();
867
868 if open_entries.contains(&entry.id) {
869 tx.send(ProjectPath {
870 worktree_id: snapshot.id(),
871 path: entry.path.clone(),
872 })
873 .await?;
874 } else {
875 filter_tx
876 .send(MatchingEntry {
877 respond: tx,
878 worktree_path: snapshot.abs_path().clone(),
879 path: ProjectPath {
880 worktree_id: snapshot.id(),
881 path: entry.path.clone(),
882 },
883 })
884 .await?;
885 }
886
887 output_tx.send(rx).await?;
888 }
889 }
890 Ok(())
891 }
892
893 pub fn branches(
894 &self,
895 project_path: ProjectPath,
896 cx: &App,
897 ) -> Task<Result<Vec<git::repository::Branch>>> {
898 let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) else {
899 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
900 };
901
902 match worktree.read(cx) {
903 Worktree::Local(local_worktree) => {
904 let branches = util::maybe!({
905 let worktree_error = |error| {
906 format!(
907 "{} for worktree {}",
908 error,
909 local_worktree.abs_path().to_string_lossy()
910 )
911 };
912
913 let entry = local_worktree
914 .git_entry(project_path.path)
915 .with_context(|| worktree_error("No git entry found"))?;
916
917 let repo = local_worktree
918 .get_local_repo(&entry)
919 .with_context(|| worktree_error("No repository found"))?
920 .repo()
921 .clone();
922
923 repo.branches()
924 });
925
926 Task::ready(branches)
927 }
928 Worktree::Remote(remote_worktree) => {
929 let request = remote_worktree.client().request(proto::GitBranches {
930 project_id: remote_worktree.project_id(),
931 repository: Some(proto::ProjectPath {
932 worktree_id: project_path.worktree_id.to_proto(),
933 path: project_path.path.to_proto(), // Root path
934 }),
935 });
936
937 cx.background_spawn(async move {
938 let response = request.await?;
939
940 let branches = response
941 .branches
942 .into_iter()
943 .map(|proto_branch| git::repository::Branch {
944 is_head: proto_branch.is_head,
945 name: proto_branch.name.into(),
946 upstream: proto_branch.upstream.map(|upstream| {
947 git::repository::Upstream {
948 ref_name: upstream.ref_name.into(),
949 tracking: upstream
950 .tracking
951 .map(|tracking| {
952 git::repository::UpstreamTracking::Tracked(
953 git::repository::UpstreamTrackingStatus {
954 ahead: tracking.ahead as u32,
955 behind: tracking.behind as u32,
956 },
957 )
958 })
959 .unwrap_or(git::repository::UpstreamTracking::Gone),
960 }
961 }),
962 most_recent_commit: proto_branch.most_recent_commit.map(|commit| {
963 git::repository::CommitSummary {
964 sha: commit.sha.into(),
965 subject: commit.subject.into(),
966 commit_timestamp: commit.commit_timestamp,
967 }
968 }),
969 })
970 .collect();
971
972 Ok(branches)
973 })
974 }
975 }
976 }
977
978 pub fn update_or_create_branch(
979 &self,
980 repository: ProjectPath,
981 new_branch: String,
982 cx: &App,
983 ) -> Task<Result<()>> {
984 let Some(worktree) = self.worktree_for_id(repository.worktree_id, cx) else {
985 return Task::ready(Err(anyhow!("No worktree found for ProjectPath")));
986 };
987
988 match worktree.read(cx) {
989 Worktree::Local(local_worktree) => {
990 let result = util::maybe!({
991 let worktree_error = |error| {
992 format!(
993 "{} for worktree {}",
994 error,
995 local_worktree.abs_path().to_string_lossy()
996 )
997 };
998
999 let entry = local_worktree
1000 .git_entry(repository.path)
1001 .with_context(|| worktree_error("No git entry found"))?;
1002
1003 let repo = local_worktree
1004 .get_local_repo(&entry)
1005 .with_context(|| worktree_error("No repository found"))?
1006 .repo()
1007 .clone();
1008
1009 if !repo.branch_exits(&new_branch)? {
1010 repo.create_branch(&new_branch)?;
1011 }
1012
1013 repo.change_branch(&new_branch)?;
1014 Ok(())
1015 });
1016
1017 Task::ready(result)
1018 }
1019 Worktree::Remote(remote_worktree) => {
1020 let request = remote_worktree.client().request(proto::UpdateGitBranch {
1021 project_id: remote_worktree.project_id(),
1022 repository: Some(proto::ProjectPath {
1023 worktree_id: repository.worktree_id.to_proto(),
1024 path: repository.path.to_proto(), // Root path
1025 }),
1026 branch_name: new_branch,
1027 });
1028
1029 cx.background_spawn(async move {
1030 request.await?;
1031 Ok(())
1032 })
1033 }
1034 }
1035 }
1036
1037 async fn filter_paths(
1038 fs: &Arc<dyn Fs>,
1039 mut input: Receiver<MatchingEntry>,
1040 query: &SearchQuery,
1041 ) -> Result<()> {
1042 let mut input = pin!(input);
1043 while let Some(mut entry) = input.next().await {
1044 let abs_path = entry.worktree_path.join(&entry.path.path);
1045 let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
1046 continue;
1047 };
1048
1049 let mut file = BufReader::new(file);
1050 let file_start = file.fill_buf()?;
1051
1052 if let Err(Some(starting_position)) =
1053 std::str::from_utf8(file_start).map_err(|e| e.error_len())
1054 {
1055 // Before attempting to match the file content, throw away files that have invalid UTF-8 sequences early on;
1056 // That way we can still match files in a streaming fashion without having look at "obviously binary" files.
1057 log::debug!(
1058 "Invalid UTF-8 sequence in file {abs_path:?} at byte position {starting_position}"
1059 );
1060 continue;
1061 }
1062
1063 if query.detect(file).unwrap_or(false) {
1064 entry.respond.send(entry.path).await?
1065 }
1066 }
1067
1068 Ok(())
1069 }
1070
1071 pub async fn handle_create_project_entry(
1072 this: Entity<Self>,
1073 envelope: TypedEnvelope<proto::CreateProjectEntry>,
1074 mut cx: AsyncApp,
1075 ) -> Result<proto::ProjectEntryResponse> {
1076 let worktree = this.update(&mut cx, |this, cx| {
1077 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
1078 this.worktree_for_id(worktree_id, cx)
1079 .ok_or_else(|| anyhow!("worktree not found"))
1080 })??;
1081 Worktree::handle_create_entry(worktree, envelope.payload, cx).await
1082 }
1083
1084 pub async fn handle_copy_project_entry(
1085 this: Entity<Self>,
1086 envelope: TypedEnvelope<proto::CopyProjectEntry>,
1087 mut cx: AsyncApp,
1088 ) -> Result<proto::ProjectEntryResponse> {
1089 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1090 let worktree = this.update(&mut cx, |this, cx| {
1091 this.worktree_for_entry(entry_id, cx)
1092 .ok_or_else(|| anyhow!("worktree not found"))
1093 })??;
1094 Worktree::handle_copy_entry(worktree, envelope.payload, cx).await
1095 }
1096
1097 pub async fn handle_delete_project_entry(
1098 this: Entity<Self>,
1099 envelope: TypedEnvelope<proto::DeleteProjectEntry>,
1100 mut cx: AsyncApp,
1101 ) -> Result<proto::ProjectEntryResponse> {
1102 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1103 let worktree = this.update(&mut cx, |this, cx| {
1104 this.worktree_for_entry(entry_id, cx)
1105 .ok_or_else(|| anyhow!("worktree not found"))
1106 })??;
1107 Worktree::handle_delete_entry(worktree, envelope.payload, cx).await
1108 }
1109
1110 pub async fn handle_expand_project_entry(
1111 this: Entity<Self>,
1112 envelope: TypedEnvelope<proto::ExpandProjectEntry>,
1113 mut cx: AsyncApp,
1114 ) -> Result<proto::ExpandProjectEntryResponse> {
1115 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1116 let worktree = this
1117 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1118 .ok_or_else(|| anyhow!("invalid request"))?;
1119 Worktree::handle_expand_entry(worktree, envelope.payload, cx).await
1120 }
1121
1122 pub async fn handle_expand_all_for_project_entry(
1123 this: Entity<Self>,
1124 envelope: TypedEnvelope<proto::ExpandAllForProjectEntry>,
1125 mut cx: AsyncApp,
1126 ) -> Result<proto::ExpandAllForProjectEntryResponse> {
1127 let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id);
1128 let worktree = this
1129 .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))?
1130 .ok_or_else(|| anyhow!("invalid request"))?;
1131 Worktree::handle_expand_all_for_entry(worktree, envelope.payload, cx).await
1132 }
1133
1134 pub async fn handle_git_branches(
1135 this: Entity<Self>,
1136 branches: TypedEnvelope<proto::GitBranches>,
1137 cx: AsyncApp,
1138 ) -> Result<proto::GitBranchesResponse> {
1139 let project_path = branches
1140 .payload
1141 .repository
1142 .clone()
1143 .context("Invalid GitBranches call")?;
1144 let project_path = ProjectPath {
1145 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1146 path: Arc::<Path>::from_proto(project_path.path),
1147 };
1148
1149 let branches = this
1150 .read_with(&cx, |this, cx| this.branches(project_path, cx))?
1151 .await?;
1152
1153 Ok(proto::GitBranchesResponse {
1154 branches: branches.iter().map(branch_to_proto).collect(),
1155 })
1156 }
1157
1158 pub async fn handle_update_branch(
1159 this: Entity<Self>,
1160 update_branch: TypedEnvelope<proto::UpdateGitBranch>,
1161 cx: AsyncApp,
1162 ) -> Result<proto::Ack> {
1163 let project_path = update_branch
1164 .payload
1165 .repository
1166 .clone()
1167 .context("Invalid GitBranches call")?;
1168 let project_path = ProjectPath {
1169 worktree_id: WorktreeId::from_proto(project_path.worktree_id),
1170 path: Arc::<Path>::from_proto(project_path.path),
1171 };
1172 let new_branch = update_branch.payload.branch_name;
1173
1174 this.read_with(&cx, |this, cx| {
1175 this.update_or_create_branch(project_path, new_branch, cx)
1176 })?
1177 .await?;
1178
1179 Ok(proto::Ack {})
1180 }
1181}
1182
1183#[derive(Clone, Debug)]
1184enum WorktreeHandle {
1185 Strong(Entity<Worktree>),
1186 Weak(WeakEntity<Worktree>),
1187}
1188
1189impl WorktreeHandle {
1190 fn upgrade(&self) -> Option<Entity<Worktree>> {
1191 match self {
1192 WorktreeHandle::Strong(handle) => Some(handle.clone()),
1193 WorktreeHandle::Weak(handle) => handle.upgrade(),
1194 }
1195 }
1196}