From 614d143571826653512d6e8ef722a2a00e04f3aa Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Fri, 19 Dec 2025 22:24:13 +0100 Subject: [PATCH] Less scoping and lifetimes for workers --- crates/project/src/project_search.rs | 96 +++++++++++++++------------- 1 file changed, 53 insertions(+), 43 deletions(-) diff --git a/crates/project/src/project_search.rs b/crates/project/src/project_search.rs index 249df8ebd1648ca024be9d55c64bf2440b45e76d..693dcf73d8ce4325c222766614f0399d34e9266b 100644 --- a/crates/project/src/project_search.rs +++ b/crates/project/src/project_search.rs @@ -173,6 +173,7 @@ impl Search { unnamed_buffers.push(handle) }; } + let open_buffers = Arc::new(open_buffers); let executor = cx.background_executor().clone(); let (tx, rx) = unbounded(); let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded(); @@ -223,7 +224,7 @@ impl Search { )) .boxed_local(), Self::open_buffers( - &self.buffer_store, + self.buffer_store, get_buffer_for_full_scan_rx, grab_buffer_snapshot_tx, cx.clone(), @@ -256,24 +257,26 @@ impl Search { query: Some(query.to_proto()), limit: self.limit as _, }); + let weak_buffer_store = self.buffer_store.downgrade(); + let buffer_store = self.buffer_store; let Ok(guard) = cx.update(|cx| { Project::retain_remotely_created_models_impl( &models, - &self.buffer_store, + &buffer_store, &self.worktree_store, cx, ) }) else { return; }; - let buffer_store = self.buffer_store.downgrade(); + let issue_remote_buffers_request = cx .spawn(async move |cx| { let _ = maybe!(async move { let response = request.await?; for buffer_id in response.buffer_ids { let buffer_id = BufferId::new(buffer_id)?; - let buffer = buffer_store + let buffer = weak_buffer_store .update(cx, |buffer_store, cx| { buffer_store.wait_for_remote_buffer(buffer_id, cx) })? @@ -297,22 +300,27 @@ impl Search { let should_find_all_matches = !tx.is_closed(); - let worker_pool = executor.scoped(|scope| { - let num_cpus = executor.num_cpus(); + let _executor = executor.clone(); + let worker_pool = executor.spawn(async move { + let num_cpus = _executor.num_cpus(); assert!(num_cpus > 0); - for _ in 0..executor.num_cpus() - 1 { - let worker = Worker { - query: &query, - open_buffers: &open_buffers, - candidates: candidate_searcher.clone(), - find_all_matches_rx: find_all_matches_rx.clone(), - }; - scope.spawn(worker.run()); - } + _executor + .scoped(|scope| { + for _ in 0..num_cpus - 1 { + let worker = Worker { + query: query.clone(), + open_buffers: open_buffers.clone(), + candidates: candidate_searcher.clone(), + find_all_matches_rx: find_all_matches_rx.clone(), + }; + scope.spawn(worker.run()); + } - drop(find_all_matches_rx); - drop(candidate_searcher); + drop(find_all_matches_rx); + drop(candidate_searcher); + }) + .await; }); let (sorted_matches_tx, sorted_matches_rx) = unbounded(); @@ -374,6 +382,7 @@ impl Search { async move |cx| { _ = maybe!(async move { let gitignored_tracker = PathInclusionMatcher::new(query.clone()); + let include_ignored = query.include_ignored(); for worktree in worktrees { let (mut snapshot, worktree_settings) = worktree .read_with(cx, |this, _| { @@ -406,27 +415,28 @@ impl Search { } snapshot = worktree.read_with(cx, |this, _| this.snapshot())?; } + let tx = tx.clone(); + let results = results.clone(); + cx.background_executor() - .scoped(|scope| { - scope.spawn(async { - for entry in snapshot.files(query.include_ignored(), 0) { - let (should_scan_tx, should_scan_rx) = oneshot::channel(); - - let Ok(_) = tx - .send(InputPath { - entry: entry.clone(), - snapshot: snapshot.clone(), - should_scan_tx, - }) - .await - else { - return; - }; - if results.send(should_scan_rx).await.is_err() { - return; - }; - } - }) + .spawn(async move { + for entry in snapshot.files(include_ignored, 0) { + let (should_scan_tx, should_scan_rx) = oneshot::channel(); + + let Ok(_) = tx + .send(InputPath { + entry: entry.clone(), + snapshot: snapshot.clone(), + should_scan_tx, + }) + .await + else { + return; + }; + if results.send(should_scan_rx).await.is_err() { + return; + }; + } }) .await; } @@ -460,7 +470,7 @@ impl Search { /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf. async fn open_buffers( - buffer_store: &Entity, + buffer_store: Entity, rx: Receiver, find_all_matches_tx: Sender>, mut cx: AsyncApp, @@ -578,9 +588,9 @@ impl Search { } } -struct Worker<'search> { - query: &'search SearchQuery, - open_buffers: &'search HashSet, +struct Worker { + query: Arc, + open_buffers: Arc>, candidates: FindSearchCandidates, /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot. /// Then, when you're done, share them via the channel you were given. @@ -591,7 +601,7 @@ struct Worker<'search> { )>, } -impl Worker<'_> { +impl Worker { async fn run(self) { let ( input_paths_rx, @@ -622,7 +632,7 @@ impl Worker<'_> { loop { let handler = RequestHandler { - query: self.query, + query: &self.query, open_entries: &self.open_buffers, fs: fs.as_deref(), confirm_contents_will_match_tx: &confirm_contents_will_match_tx,