Implement a scoped pool on `executor::Background`

Antonio Scandurra created

Change summary

gpui/src/executor.rs |  75 +++++++++++++++++++++++++++--
zed/src/worktree.rs  | 114 +++++++++++++++++++++++++--------------------
2 files changed, 130 insertions(+), 59 deletions(-)

Detailed changes

gpui/src/executor.rs 🔗

@@ -3,12 +3,15 @@ use async_task::Runnable;
 pub use async_task::Task;
 use parking_lot::Mutex;
 use rand::prelude::*;
-use smol::prelude::*;
-use smol::{channel, Executor};
-use std::rc::Rc;
-use std::sync::mpsc::SyncSender;
-use std::sync::Arc;
-use std::{marker::PhantomData, thread};
+use smol::{channel, prelude::*, Executor};
+use std::{
+    marker::PhantomData,
+    mem,
+    pin::Pin,
+    rc::Rc,
+    sync::{mpsc::SyncSender, Arc},
+    thread,
+};
 
 use crate::platform;
 
@@ -25,6 +28,7 @@ pub enum Background {
     Deterministic(Arc<Deterministic>),
     Production {
         executor: Arc<smol::Executor<'static>>,
+        threads: usize,
         _stop: channel::Sender<()>,
     },
 }
@@ -155,8 +159,9 @@ impl Background {
     pub fn new() -> Self {
         let executor = Arc::new(Executor::new());
         let stop = channel::unbounded::<()>();
+        let threads = num_cpus::get();
 
-        for i in 0..num_cpus::get() {
+        for i in 0..threads {
             let executor = executor.clone();
             let stop = stop.1.clone();
             thread::Builder::new()
@@ -167,10 +172,18 @@ impl Background {
 
         Self::Production {
             executor,
+            threads,
             _stop: stop.0,
         }
     }
 
+    pub fn threads(&self) -> usize {
+        match self {
+            Self::Deterministic(_) => 1,
+            Self::Production { threads, .. } => *threads,
+        }
+    }
+
     pub fn spawn<T, F>(&self, future: F) -> Task<T>
     where
         T: 'static + Send,
@@ -181,6 +194,54 @@ impl Background {
             Self::Deterministic(executor) => executor.spawn(future),
         }
     }
+
+    pub async fn scoped<'scope, F>(&self, scheduler: F)
+    where
+        F: FnOnce(&mut Scope<'scope>),
+    {
+        let mut scope = Scope {
+            futures: Default::default(),
+            _phantom: PhantomData,
+        };
+        (scheduler)(&mut scope);
+        match self {
+            Self::Deterministic(_) => {
+                for spawned in scope.futures {
+                    spawned.await;
+                }
+            }
+            Self::Production { executor, .. } => {
+                let spawned = scope
+                    .futures
+                    .into_iter()
+                    .map(|f| executor.spawn(f))
+                    .collect::<Vec<_>>();
+                for task in spawned {
+                    task.await;
+                }
+            }
+        }
+    }
+}
+
+pub struct Scope<'a> {
+    futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
+    _phantom: PhantomData<&'a ()>,
+}
+
+impl<'a> Scope<'a> {
+    pub fn spawn<F>(&mut self, f: F)
+    where
+        F: Future<Output = ()> + Send + 'a,
+    {
+        let f = unsafe {
+            mem::transmute::<
+                Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
+                Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
+            >(Box::pin(f))
+        };
+        self.futures.push(f);
+    }
 }
 
 pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {

zed/src/worktree.rs 🔗

@@ -16,7 +16,7 @@ use anyhow::{anyhow, Context, Result};
 use atomic::Ordering::SeqCst;
 pub use fuzzy::{match_paths, PathMatch};
 use gpui::{
-    scoped_pool, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
+    executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
     Task, WeakModelHandle,
 };
 use lazy_static::lazy_static;
@@ -374,9 +374,13 @@ impl Worktree {
             Duration::from_millis(100),
         );
         let background_snapshot = tree.background_snapshot.clone();
-        let id = tree.id;
         std::thread::spawn(move || {
-            let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id);
+            let scanner = BackgroundScanner::new(
+                background_snapshot,
+                scan_states_tx,
+                fs,
+                Arc::new(executor::Background::new()),
+            );
             scanner.run(event_stream);
         });
         tree._event_stream_handle = Some(event_stream_handle);
@@ -392,12 +396,13 @@ impl Worktree {
     ) -> Self {
         let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx);
         let background_snapshot = tree.background_snapshot.clone();
-        let id = tree.id;
         let fs = fs.clone();
+        let background = cx.background().clone();
         cx.background()
             .spawn(async move {
                 let events_rx = fs.events().await;
-                let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id);
+                let scanner =
+                    BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background);
                 scanner.run_test(events_rx).await;
             })
             .detach();
@@ -1980,21 +1985,21 @@ struct BackgroundScanner {
     fs: Arc<dyn Fs>,
     snapshot: Arc<Mutex<Snapshot>>,
     notify: Sender<ScanState>,
-    thread_pool: scoped_pool::Pool,
+    executor: Arc<executor::Background>,
 }
 
 impl BackgroundScanner {
     fn new(
-        fs: Arc<dyn Fs>,
         snapshot: Arc<Mutex<Snapshot>>,
         notify: Sender<ScanState>,
-        worktree_id: usize,
+        fs: Arc<dyn Fs>,
+        executor: Arc<executor::Background>,
     ) -> Self {
         Self {
             fs,
             snapshot,
             notify,
-            thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)),
+            executor,
         }
     }
 
@@ -2124,21 +2129,23 @@ impl BackgroundScanner {
             .unwrap();
             drop(tx);
 
-            self.thread_pool.scoped(|pool| {
-                for _ in 0..self.thread_pool.thread_count() {
-                    pool.execute(|| {
-                        while let Ok(job) = rx.recv() {
-                            if let Err(err) = smol::block_on(self.scan_dir(
-                                root_char_bag,
-                                next_entry_id.clone(),
-                                &job,
-                            )) {
-                                log::error!("error scanning {:?}: {}", job.abs_path, err);
+            self.executor
+                .scoped(|scope| {
+                    for _ in 0..self.executor.threads() {
+                        scope.spawn(async {
+                            while let Ok(job) = rx.recv() {
+                                if let Err(err) = smol::block_on(self.scan_dir(
+                                    root_char_bag,
+                                    next_entry_id.clone(),
+                                    &job,
+                                )) {
+                                    log::error!("error scanning {:?}: {}", job.abs_path, err);
+                                }
                             }
-                        }
-                    });
-                }
-            });
+                        });
+                    }
+                })
+                .await;
         }
 
         Ok(())
@@ -2324,30 +2331,31 @@ impl BackgroundScanner {
 
         // Scan any directories that were created as part of this event batch.
         drop(scan_queue_tx);
-        self.thread_pool.scoped(|pool| {
-            for _ in 0..self.thread_pool.thread_count() {
-                pool.execute(|| {
-                    while let Ok(job) = scan_queue_rx.recv() {
-                        if let Err(err) = smol::block_on(self.scan_dir(
-                            root_char_bag,
-                            next_entry_id.clone(),
-                            &job,
-                        )) {
-                            log::error!("error scanning {:?}: {}", job.abs_path, err);
+        self.executor
+            .scoped(|scope| {
+                for _ in 0..self.executor.threads() {
+                    scope.spawn(async {
+                        while let Ok(job) = scan_queue_rx.recv() {
+                            if let Err(err) = self
+                                .scan_dir(root_char_bag, next_entry_id.clone(), &job)
+                                .await
+                            {
+                                log::error!("error scanning {:?}: {}", job.abs_path, err);
+                            }
                         }
-                    }
-                });
-            }
-        });
+                    });
+                }
+            })
+            .await;
 
         // Attempt to detect renames only over a single batch of file-system events.
         self.snapshot.lock().removed_entry_ids.clear();
 
-        self.update_ignore_statuses();
+        self.update_ignore_statuses().await;
         true
     }
 
-    fn update_ignore_statuses(&self) {
+    async fn update_ignore_statuses(&self) {
         let mut snapshot = self.snapshot();
 
         let mut ignores_to_update = Vec::new();
@@ -2390,15 +2398,17 @@ impl BackgroundScanner {
         }
         drop(ignore_queue_tx);
 
-        self.thread_pool.scoped(|scope| {
-            for _ in 0..self.thread_pool.thread_count() {
-                scope.execute(|| {
-                    while let Ok(job) = ignore_queue_rx.recv() {
-                        self.update_ignore_status(job, &snapshot);
-                    }
-                });
-            }
-        });
+        self.executor
+            .scoped(|scope| {
+                for _ in 0..self.executor.threads() {
+                    scope.spawn(async {
+                        while let Ok(job) = ignore_queue_rx.recv() {
+                            self.update_ignore_status(job, &snapshot);
+                        }
+                    });
+                }
+            })
+            .await;
     }
 
     fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) {
@@ -3147,7 +3157,6 @@ mod tests {
 
             let (notify_tx, _notify_rx) = smol::channel::unbounded();
             let mut scanner = BackgroundScanner::new(
-                Arc::new(OsFs),
                 Arc::new(Mutex::new(Snapshot {
                     id: 0,
                     scan_id: 0,
@@ -3161,7 +3170,8 @@ mod tests {
                     next_entry_id: Default::default(),
                 })),
                 notify_tx,
-                0,
+                Arc::new(OsFs),
+                Arc::new(gpui::executor::Background::new()),
             );
             smol::block_on(scanner.scan_dirs()).unwrap();
             scanner.snapshot().check_invariants();
@@ -3186,7 +3196,6 @@ mod tests {
 
             let (notify_tx, _notify_rx) = smol::channel::unbounded();
             let mut new_scanner = BackgroundScanner::new(
-                scanner.fs.clone(),
                 Arc::new(Mutex::new(Snapshot {
                     id: 0,
                     scan_id: 0,
@@ -3200,7 +3209,8 @@ mod tests {
                     next_entry_id: Default::default(),
                 })),
                 notify_tx,
-                1,
+                scanner.fs.clone(),
+                scanner.executor.clone(),
             );
             smol::block_on(new_scanner.scan_dirs()).unwrap();
             assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());