Use the same background executor for spawning CPU intensive tasks

Antonio Scandurra , Nathan Sobo , and Max Brunsfeld created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>

Change summary

gpui/src/app.rs           |  15 ----
gpui/src/executor.rs      |  12 --
gpui_macros/src/lib.rs    |   2 
zed/src/file_finder.rs    |   4 
zed/src/worktree.rs       |  12 +-
zed/src/worktree/fuzzy.rs | 143 ++++++++++++++++++++--------------------
6 files changed, 85 insertions(+), 103 deletions(-)

Detailed changes

gpui/src/app.rs 🔗

@@ -123,7 +123,6 @@ impl App {
         let cx = Rc::new(RefCell::new(MutableAppContext::new(
             foreground,
             Arc::new(executor::Background::new()),
-            Arc::new(executor::Background::new()),
             Arc::new(platform),
             Rc::new(foreground_platform),
             (),
@@ -140,7 +139,6 @@ impl App {
         let app = Self(Rc::new(RefCell::new(MutableAppContext::new(
             foreground,
             Arc::new(executor::Background::new()),
-            Arc::new(executor::Background::new()),
             platform.clone(),
             foreground_platform.clone(),
             asset_source,
@@ -247,7 +245,6 @@ impl TestAppContext {
     pub fn new(
         foreground: Rc<executor::Foreground>,
         background: Arc<executor::Background>,
-        thread_pool: Arc<executor::Background>,
         first_entity_id: usize,
     ) -> Self {
         let platform = Arc::new(platform::test::platform());
@@ -255,7 +252,6 @@ impl TestAppContext {
         let mut cx = MutableAppContext::new(
             foreground.clone(),
             background,
-            thread_pool,
             platform,
             foreground_platform.clone(),
             (),
@@ -594,7 +590,6 @@ impl MutableAppContext {
     fn new(
         foreground: Rc<executor::Foreground>,
         background: Arc<executor::Background>,
-        thread_pool: Arc<executor::Background>,
         platform: Arc<dyn platform::Platform>,
         foreground_platform: Rc<dyn platform::ForegroundPlatform>,
         asset_source: impl AssetSource,
@@ -612,7 +607,6 @@ impl MutableAppContext {
                 values: Default::default(),
                 ref_counts: Arc::new(Mutex::new(RefCounts::default())),
                 background,
-                thread_pool,
                 font_cache: Arc::new(FontCache::new(fonts)),
             },
             actions: HashMap::new(),
@@ -1490,7 +1484,6 @@ pub struct AppContext {
     values: RwLock<HashMap<(TypeId, usize), Box<dyn Any>>>,
     background: Arc<executor::Background>,
     ref_counts: Arc<Mutex<RefCounts>>,
-    thread_pool: Arc<executor::Background>,
     font_cache: Arc<FontCache>,
 }
 
@@ -1535,10 +1528,6 @@ impl AppContext {
         &self.font_cache
     }
 
-    pub fn thread_pool(&self) -> &Arc<executor::Background> {
-        &self.thread_pool
-    }
-
     pub fn value<Tag: 'static, T: 'static + Default>(&self, id: usize) -> ValueHandle<T> {
         let key = (TypeId::of::<Tag>(), id);
         let mut values = self.values.write();
@@ -1721,10 +1710,6 @@ impl<'a, T: Entity> ModelContext<'a, T> {
         &self.app.cx.background
     }
 
-    pub fn thread_pool(&self) -> &Arc<executor::Background> {
-        &self.app.cx.thread_pool
-    }
-
     pub fn halt_stream(&mut self) {
         self.halt_stream = true;
     }

gpui/src/executor.rs 🔗

@@ -34,7 +34,6 @@ pub enum Background {
     Deterministic(Arc<Deterministic>),
     Production {
         executor: Arc<smol::Executor<'static>>,
-        threads: usize,
         _stop: channel::Sender<()>,
     },
 }
@@ -324,9 +323,8 @@ impl Background {
     pub fn new() -> Self {
         let executor = Arc::new(Executor::new());
         let stop = channel::unbounded::<()>();
-        let threads = num_cpus::get();
 
-        for i in 0..threads {
+        for i in 0..2 * num_cpus::get() {
             let executor = executor.clone();
             let stop = stop.1.clone();
             thread::Builder::new()
@@ -337,16 +335,12 @@ impl Background {
 
         Self::Production {
             executor,
-            threads,
             _stop: stop.0,
         }
     }
 
-    pub fn threads(&self) -> usize {
-        match self {
-            Self::Deterministic(_) => 1,
-            Self::Production { threads, .. } => *threads,
-        }
+    pub fn num_cpus(&self) -> usize {
+        num_cpus::get()
     }
 
     pub fn spawn<T, F>(&self, future: F) -> Task<T>

gpui_macros/src/lib.rs 🔗

@@ -60,7 +60,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
     let inner_fn_args = (0..inner_fn.sig.inputs.len())
         .map(|i| {
             let first_entity_id = i * 100_000;
-            quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), background.clone(), #first_entity_id),)
+            quote!(#namespace::TestAppContext::new(foreground.clone(), background.clone(), #first_entity_id),)
         })
         .collect::<proc_macro2::TokenStream>();
 

zed/src/file_finder.rs 🔗

@@ -399,7 +399,7 @@ impl FileFinder {
             .map(|tree| tree.read(cx).snapshot())
             .collect::<Vec<_>>();
         let search_id = util::post_inc(&mut self.search_count);
-        let pool = cx.as_ref().thread_pool().clone();
+        let background = cx.as_ref().background().clone();
         self.cancel_flag.store(true, atomic::Ordering::Relaxed);
         self.cancel_flag = Arc::new(AtomicBool::new(false));
         let cancel_flag = self.cancel_flag.clone();
@@ -413,7 +413,7 @@ impl FileFinder {
                 false,
                 100,
                 cancel_flag.clone(),
-                pool,
+                background,
             )
             .await;
             let did_cancel = cancel_flag.load(atomic::Ordering::Relaxed);

zed/src/worktree.rs 🔗

@@ -552,11 +552,11 @@ impl Worktree {
             let tree = tree.as_local_mut().unwrap();
             let abs_path = tree.snapshot.abs_path.clone();
             let background_snapshot = tree.background_snapshot.clone();
-            let thread_pool = cx.thread_pool().clone();
+            let background = cx.background().clone();
             tree._background_scanner_task = Some(cx.background().spawn(async move {
                 let events = fs.watch(&abs_path, Duration::from_millis(100)).await;
                 let scanner =
-                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, thread_pool);
+                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
                 scanner.run(events).await;
             }));
         });
@@ -2295,7 +2295,7 @@ impl BackgroundScanner {
 
             self.executor
                 .scoped(|scope| {
-                    for _ in 0..self.executor.threads() {
+                    for _ in 0..self.executor.num_cpus() {
                         scope.spawn(async {
                             while let Ok(job) = rx.recv().await {
                                 if let Err(err) = self
@@ -2487,7 +2487,7 @@ impl BackgroundScanner {
         drop(scan_queue_tx);
         self.executor
             .scoped(|scope| {
-                for _ in 0..self.executor.threads() {
+                for _ in 0..self.executor.num_cpus() {
                     scope.spawn(async {
                         while let Ok(job) = scan_queue_rx.recv().await {
                             if let Err(err) = self
@@ -2555,7 +2555,7 @@ impl BackgroundScanner {
 
         self.executor
             .scoped(|scope| {
-                for _ in 0..self.executor.threads() {
+                for _ in 0..self.executor.num_cpus() {
                     scope.spawn(async {
                         while let Ok(job) = ignore_queue_rx.recv().await {
                             self.update_ignore_status(job, &snapshot).await;
@@ -3044,7 +3044,7 @@ mod tests {
                     false,
                     10,
                     Default::default(),
-                    cx.thread_pool().clone(),
+                    cx.background().clone(),
                 )
             })
             .await;

zed/src/worktree/fuzzy.rs 🔗

@@ -59,7 +59,7 @@ pub async fn match_paths<'a, T>(
     smart_case: bool,
     max_results: usize,
     cancel_flag: Arc<AtomicBool>,
-    pool: Arc<executor::Background>,
+    background: Arc<executor::Background>,
 ) -> Vec<PathMatch>
 where
     T: Clone + Send + Iterator<Item = &'a Snapshot> + 'a,
@@ -77,82 +77,85 @@ where
         snapshots.clone().map(Snapshot::visible_file_count).sum()
     };
 
-    let segment_size = (path_count + pool.threads() - 1) / pool.threads();
-    let mut segment_results = (0..pool.threads())
+    let num_cpus = background.num_cpus().min(path_count);
+    let segment_size = (path_count + num_cpus - 1) / num_cpus;
+    let mut segment_results = (0..num_cpus)
         .map(|_| Vec::with_capacity(max_results))
         .collect::<Vec<_>>();
 
-    pool.scoped(|scope| {
-        for (segment_idx, results) in segment_results.iter_mut().enumerate() {
-            let snapshots = snapshots.clone();
-            let cancel_flag = &cancel_flag;
-            scope.spawn(async move {
-                let segment_start = segment_idx * segment_size;
-                let segment_end = segment_start + segment_size;
-
-                let mut min_score = 0.0;
-                let mut last_positions = Vec::new();
-                last_positions.resize(query.len(), 0);
-                let mut match_positions = Vec::new();
-                match_positions.resize(query.len(), 0);
-                let mut score_matrix = Vec::new();
-                let mut best_position_matrix = Vec::new();
-
-                let mut tree_start = 0;
-                for snapshot in snapshots {
-                    let tree_end = if include_ignored {
-                        tree_start + snapshot.file_count()
-                    } else {
-                        tree_start + snapshot.visible_file_count()
-                    };
-
-                    let include_root_name = include_root_name || snapshot.root_entry().is_file();
-                    if tree_start < segment_end && segment_start < tree_end {
-                        let start = max(tree_start, segment_start) - tree_start;
-                        let end = min(tree_end, segment_end) - tree_start;
-                        let entries = if include_ignored {
-                            snapshot.files(start).take(end - start)
+    background
+        .scoped(|scope| {
+            for (segment_idx, results) in segment_results.iter_mut().enumerate() {
+                let snapshots = snapshots.clone();
+                let cancel_flag = &cancel_flag;
+                scope.spawn(async move {
+                    let segment_start = segment_idx * segment_size;
+                    let segment_end = segment_start + segment_size;
+
+                    let mut min_score = 0.0;
+                    let mut last_positions = Vec::new();
+                    last_positions.resize(query.len(), 0);
+                    let mut match_positions = Vec::new();
+                    match_positions.resize(query.len(), 0);
+                    let mut score_matrix = Vec::new();
+                    let mut best_position_matrix = Vec::new();
+
+                    let mut tree_start = 0;
+                    for snapshot in snapshots {
+                        let tree_end = if include_ignored {
+                            tree_start + snapshot.file_count()
                         } else {
-                            snapshot.visible_files(start).take(end - start)
+                            tree_start + snapshot.visible_file_count()
                         };
-                        let paths = entries.map(|entry| {
-                            if let EntryKind::File(char_bag) = entry.kind {
-                                MatchCandidate {
-                                    path: &entry.path,
-                                    char_bag,
-                                }
+
+                        let include_root_name =
+                            include_root_name || snapshot.root_entry().is_file();
+                        if tree_start < segment_end && segment_start < tree_end {
+                            let start = max(tree_start, segment_start) - tree_start;
+                            let end = min(tree_end, segment_end) - tree_start;
+                            let entries = if include_ignored {
+                                snapshot.files(start).take(end - start)
                             } else {
-                                unreachable!()
-                            }
-                        });
-
-                        match_single_tree_paths(
-                            snapshot,
-                            include_root_name,
-                            paths,
-                            query,
-                            lowercase_query,
-                            query_chars,
-                            smart_case,
-                            results,
-                            max_results,
-                            &mut min_score,
-                            &mut match_positions,
-                            &mut last_positions,
-                            &mut score_matrix,
-                            &mut best_position_matrix,
-                            &cancel_flag,
-                        );
-                    }
-                    if tree_end >= segment_end {
-                        break;
+                                snapshot.visible_files(start).take(end - start)
+                            };
+                            let paths = entries.map(|entry| {
+                                if let EntryKind::File(char_bag) = entry.kind {
+                                    MatchCandidate {
+                                        path: &entry.path,
+                                        char_bag,
+                                    }
+                                } else {
+                                    unreachable!()
+                                }
+                            });
+
+                            match_single_tree_paths(
+                                snapshot,
+                                include_root_name,
+                                paths,
+                                query,
+                                lowercase_query,
+                                query_chars,
+                                smart_case,
+                                results,
+                                max_results,
+                                &mut min_score,
+                                &mut match_positions,
+                                &mut last_positions,
+                                &mut score_matrix,
+                                &mut best_position_matrix,
+                                &cancel_flag,
+                            );
+                        }
+                        if tree_end >= segment_end {
+                            break;
+                        }
+                        tree_start = tree_end;
                     }
-                    tree_start = tree_end;
-                }
-            })
-        }
-    })
-    .await;
+                })
+            }
+        })
+        .await;
 
     let mut results = Vec::new();
     for segment_result in segment_results {