Less scoping and lifetimes for workers

Piotr Osiewicz created

Change summary

crates/project/src/project_search.rs | 96 ++++++++++++++++-------------
1 file changed, 53 insertions(+), 43 deletions(-)

Detailed changes

crates/project/src/project_search.rs 🔗

@@ -173,6 +173,7 @@ impl Search {
                 unnamed_buffers.push(handle)
             };
         }
+        let open_buffers = Arc::new(open_buffers);
         let executor = cx.background_executor().clone();
         let (tx, rx) = unbounded();
         let (grab_buffer_snapshot_tx, grab_buffer_snapshot_rx) = unbounded();
@@ -223,7 +224,7 @@ impl Search {
                             ))
                             .boxed_local(),
                             Self::open_buffers(
-                                &self.buffer_store,
+                                self.buffer_store,
                                 get_buffer_for_full_scan_rx,
                                 grab_buffer_snapshot_tx,
                                 cx.clone(),
@@ -256,24 +257,26 @@ impl Search {
                             query: Some(query.to_proto()),
                             limit: self.limit as _,
                         });
+                        let weak_buffer_store = self.buffer_store.downgrade();
+                        let buffer_store = self.buffer_store;
                         let Ok(guard) = cx.update(|cx| {
                             Project::retain_remotely_created_models_impl(
                                 &models,
-                                &self.buffer_store,
+                                &buffer_store,
                                 &self.worktree_store,
                                 cx,
                             )
                         }) else {
                             return;
                         };
-                        let buffer_store = self.buffer_store.downgrade();
+
                         let issue_remote_buffers_request = cx
                             .spawn(async move |cx| {
                                 let _ = maybe!(async move {
                                     let response = request.await?;
                                     for buffer_id in response.buffer_ids {
                                         let buffer_id = BufferId::new(buffer_id)?;
-                                        let buffer = buffer_store
+                                        let buffer = weak_buffer_store
                                             .update(cx, |buffer_store, cx| {
                                                 buffer_store.wait_for_remote_buffer(buffer_id, cx)
                                             })?
@@ -297,22 +300,27 @@ impl Search {
 
                 let should_find_all_matches = !tx.is_closed();
 
-                let worker_pool = executor.scoped(|scope| {
-                    let num_cpus = executor.num_cpus();
+                let _executor = executor.clone();
+                let worker_pool = executor.spawn(async move {
+                    let num_cpus = _executor.num_cpus();
 
                     assert!(num_cpus > 0);
-                    for _ in 0..executor.num_cpus() - 1 {
-                        let worker = Worker {
-                            query: &query,
-                            open_buffers: &open_buffers,
-                            candidates: candidate_searcher.clone(),
-                            find_all_matches_rx: find_all_matches_rx.clone(),
-                        };
-                        scope.spawn(worker.run());
-                    }
+                    _executor
+                        .scoped(|scope| {
+                            for _ in 0..num_cpus - 1 {
+                                let worker = Worker {
+                                    query: query.clone(),
+                                    open_buffers: open_buffers.clone(),
+                                    candidates: candidate_searcher.clone(),
+                                    find_all_matches_rx: find_all_matches_rx.clone(),
+                                };
+                                scope.spawn(worker.run());
+                            }
 
-                    drop(find_all_matches_rx);
-                    drop(candidate_searcher);
+                            drop(find_all_matches_rx);
+                            drop(candidate_searcher);
+                        })
+                        .await;
                 });
 
                 let (sorted_matches_tx, sorted_matches_rx) = unbounded();
@@ -374,6 +382,7 @@ impl Search {
         async move |cx| {
             _ = maybe!(async move {
                 let gitignored_tracker = PathInclusionMatcher::new(query.clone());
+                let include_ignored = query.include_ignored();
                 for worktree in worktrees {
                     let (mut snapshot, worktree_settings) = worktree
                         .read_with(cx, |this, _| {
@@ -406,27 +415,28 @@ impl Search {
                         }
                         snapshot = worktree.read_with(cx, |this, _| this.snapshot())?;
                     }
+                    let tx = tx.clone();
+                    let results = results.clone();
+
                     cx.background_executor()
-                        .scoped(|scope| {
-                            scope.spawn(async {
-                                for entry in snapshot.files(query.include_ignored(), 0) {
-                                    let (should_scan_tx, should_scan_rx) = oneshot::channel();
-
-                                    let Ok(_) = tx
-                                        .send(InputPath {
-                                            entry: entry.clone(),
-                                            snapshot: snapshot.clone(),
-                                            should_scan_tx,
-                                        })
-                                        .await
-                                    else {
-                                        return;
-                                    };
-                                    if results.send(should_scan_rx).await.is_err() {
-                                        return;
-                                    };
-                                }
-                            })
+                        .spawn(async move {
+                            for entry in snapshot.files(include_ignored, 0) {
+                                let (should_scan_tx, should_scan_rx) = oneshot::channel();
+
+                                let Ok(_) = tx
+                                    .send(InputPath {
+                                        entry: entry.clone(),
+                                        snapshot: snapshot.clone(),
+                                        should_scan_tx,
+                                    })
+                                    .await
+                                else {
+                                    return;
+                                };
+                                if results.send(should_scan_rx).await.is_err() {
+                                    return;
+                                };
+                            }
                         })
                         .await;
                 }
@@ -460,7 +470,7 @@ impl Search {
 
     /// Background workers cannot open buffers by themselves, hence main thread will do it on their behalf.
     async fn open_buffers(
-        buffer_store: &Entity<BufferStore>,
+        buffer_store: Entity<BufferStore>,
         rx: Receiver<ProjectPath>,
         find_all_matches_tx: Sender<Entity<Buffer>>,
         mut cx: AsyncApp,
@@ -578,9 +588,9 @@ impl Search {
     }
 }
 
-struct Worker<'search> {
-    query: &'search SearchQuery,
-    open_buffers: &'search HashSet<ProjectEntryId>,
+struct Worker {
+    query: Arc<SearchQuery>,
+    open_buffers: Arc<HashSet<ProjectEntryId>>,
     candidates: FindSearchCandidates,
     /// Ok, we're back in background: run full scan & find all matches in a given buffer snapshot.
     /// Then, when you're done, share them via the channel you were given.
@@ -591,7 +601,7 @@ struct Worker<'search> {
     )>,
 }
 
-impl Worker<'_> {
+impl Worker {
     async fn run(self) {
         let (
             input_paths_rx,
@@ -622,7 +632,7 @@ impl Worker<'_> {
 
         loop {
             let handler = RequestHandler {
-                query: self.query,
+                query: &self.query,
                 open_entries: &self.open_buffers,
                 fs: fs.as_deref(),
                 confirm_contents_will_match_tx: &confirm_contents_will_match_tx,