WIP: Start restructuring executor

Antonio Scandurra created

Change summary

Cargo.lock                            |   1 
crates/gpui/Cargo.toml                |   4 
crates/gpui/src/executor.rs           | 315 ++++++++++------------------
crates/gpui/src/test.rs               |  15 
crates/gpui_macros/src/gpui_macros.rs |   8 
crates/server/src/rpc.rs              |   2 
6 files changed, 135 insertions(+), 210 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -2128,6 +2128,7 @@ dependencies = [
  "block",
  "cc",
  "cocoa",
+ "collections",
  "core-foundation",
  "core-graphics",
  "core-text",

crates/gpui/Cargo.toml 🔗

@@ -8,9 +8,10 @@ version = "0.1.0"
 path = "src/gpui.rs"
 
 [features]
-test-support = ["env_logger"]
+test-support = ["env_logger", "collections/test-support"]
 
 [dependencies]
+collections = { path = "../collections" }
 gpui_macros = { path = "../gpui_macros" }
 sum_tree = { path = "../sum_tree" }
 async-task = "4.0.3"
@@ -47,6 +48,7 @@ bindgen = "0.58.1"
 cc = "1.0.67"
 
 [dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
 env_logger = "0.8"
 png = "0.16"
 simplelog = "0.9"

crates/gpui/src/executor.rs 🔗

@@ -1,6 +1,7 @@
 use anyhow::{anyhow, Result};
 use async_task::Runnable;
 use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
+use collections::HashMap;
 use parking_lot::Mutex;
 use postage::{barrier, prelude::Stream as _};
 use rand::prelude::*;
@@ -33,7 +34,10 @@ pub enum Foreground {
         dispatcher: Arc<dyn platform::Dispatcher>,
         _not_send_or_sync: PhantomData<Rc<()>>,
     },
-    Deterministic(Arc<Deterministic>),
+    Deterministic {
+        cx_id: usize,
+        executor: Arc<Deterministic>,
+    },
 }
 
 pub enum Background {
@@ -69,9 +73,8 @@ unsafe impl<T: Send> Send for Task<T> {}
 struct DeterministicState {
     rng: StdRng,
     seed: u64,
-    scheduled_from_foreground: Vec<(Runnable, Backtrace)>,
-    scheduled_from_background: Vec<(Runnable, Backtrace)>,
-    spawned_from_foreground: Vec<(Runnable, Backtrace)>,
+    scheduled_from_foreground: HashMap<usize, Vec<ScheduledForeground>>,
+    scheduled_from_background: Vec<Runnable>,
     forbid_parking: bool,
     block_on_ticks: RangeInclusive<usize>,
     now: Instant,
@@ -79,20 +82,24 @@ struct DeterministicState {
     waiting_backtrace: Option<Backtrace>,
 }
 
+enum ScheduledForeground {
+    MainFuture,
+    Runnable(Runnable),
+}
+
 pub struct Deterministic {
     state: Arc<Mutex<DeterministicState>>,
     parker: Mutex<parking::Parker>,
 }
 
 impl Deterministic {
-    fn new(seed: u64) -> Self {
-        Self {
+    pub fn new(seed: u64) -> Arc<Self> {
+        Arc::new(Self {
             state: Arc::new(Mutex::new(DeterministicState {
                 rng: StdRng::seed_from_u64(seed),
                 seed,
                 scheduled_from_foreground: Default::default(),
                 scheduled_from_background: Default::default(),
-                spawned_from_foreground: Default::default(),
                 forbid_parking: false,
                 block_on_ticks: 0..=1000,
                 now: Instant::now(),
@@ -100,22 +107,32 @@ impl Deterministic {
                 waiting_backtrace: None,
             })),
             parker: Default::default(),
-        }
+        })
+    }
+
+    pub fn build_background(self: &Arc<Self>) -> Arc<Background> {
+        Arc::new(Background::Deterministic {
+            executor: self.clone(),
+        })
+    }
+
+    pub fn build_foreground(self: &Arc<Self>, id: usize) -> Rc<Foreground> {
+        Rc::new(Foreground::Deterministic {
+            cx_id: id,
+            executor: self.clone(),
+        })
     }
 
-    fn spawn_from_foreground(&self, future: AnyLocalFuture) -> AnyLocalTask {
-        let backtrace = Backtrace::new_unresolved();
-        let scheduled_once = AtomicBool::new(false);
+    fn spawn_from_foreground(&self, cx_id: usize, future: AnyLocalFuture) -> AnyLocalTask {
         let state = self.state.clone();
         let unparker = self.parker.lock().unparker();
         let (runnable, task) = async_task::spawn_local(future, move |runnable| {
             let mut state = state.lock();
-            let backtrace = backtrace.clone();
-            if scheduled_once.fetch_or(true, SeqCst) {
-                state.scheduled_from_foreground.push((runnable, backtrace));
-            } else {
-                state.spawned_from_foreground.push((runnable, backtrace));
-            }
+            state
+                .scheduled_from_foreground
+                .entry(cx_id)
+                .or_default()
+                .push(ScheduledForeground::Runnable(runnable));
             unparker.unpark();
         });
         runnable.schedule();
@@ -123,24 +140,21 @@ impl Deterministic {
     }
 
     fn spawn(&self, future: AnyFuture) -> AnyTask {
-        let backtrace = Backtrace::new_unresolved();
         let state = self.state.clone();
         let unparker = self.parker.lock().unparker();
         let (runnable, task) = async_task::spawn(future, move |runnable| {
             let mut state = state.lock();
-            state
-                .scheduled_from_background
-                .push((runnable, backtrace.clone()));
+            state.scheduled_from_background.push(runnable);
             unparker.unpark();
         });
         runnable.schedule();
         task
     }
 
-    fn run(&self, mut future: AnyLocalFuture) -> Box<dyn Any> {
+    fn run(&self, cx_id: usize, mut future: AnyLocalFuture) -> Box<dyn Any> {
         let woken = Arc::new(AtomicBool::new(false));
         loop {
-            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
+            if let Some(result) = self.run_internal(cx_id, woken.clone(), &mut future) {
                 return result;
             }
 
@@ -153,67 +167,92 @@ impl Deterministic {
         }
     }
 
-    fn run_until_parked(&self) {
+    fn run_until_parked(&self, cx_id: usize) {
         let woken = Arc::new(AtomicBool::new(false));
         let mut future = any_local_future(std::future::pending::<()>());
-        self.run_internal(woken, &mut future);
+        self.run_internal(cx_id, woken, &mut future);
     }
 
     fn run_internal(
         &self,
+        cx_id: usize,
         woken: Arc<AtomicBool>,
         future: &mut AnyLocalFuture,
     ) -> Option<Box<dyn Any>> {
         let unparker = self.parker.lock().unparker();
-        let waker = waker_fn(move || {
-            woken.store(true, SeqCst);
-            unparker.unpark();
+        let scheduled_main_future = Arc::new(AtomicBool::new(true));
+        self.state
+            .lock()
+            .scheduled_from_foreground
+            .entry(cx_id)
+            .or_default()
+            .insert(0, ScheduledForeground::MainFuture);
+
+        let waker = waker_fn({
+            let state = self.state.clone();
+            let scheduled_main_future = scheduled_main_future.clone();
+            move || {
+                woken.store(true, SeqCst);
+                if !scheduled_main_future.load(SeqCst) {
+                    scheduled_main_future.store(true, SeqCst);
+                    state
+                        .lock()
+                        .scheduled_from_foreground
+                        .entry(cx_id)
+                        .or_default()
+                        .push(ScheduledForeground::MainFuture);
+                }
+
+                unparker.unpark();
+            }
         });
 
         let mut cx = Context::from_waker(&waker);
-        let mut trace = Trace::default();
         loop {
             let mut state = self.state.lock();
-            let runnable_count = state.scheduled_from_foreground.len()
-                + state.scheduled_from_background.len()
-                + state.spawned_from_foreground.len();
 
-            let ix = state.rng.gen_range(0..=runnable_count);
-            if ix < state.scheduled_from_foreground.len() {
-                let (_, backtrace) = &state.scheduled_from_foreground[ix];
-                trace.record(&state, backtrace.clone());
-                let runnable = state.scheduled_from_foreground.remove(ix).0;
-                drop(state);
-                runnable.run();
-            } else if ix - state.scheduled_from_foreground.len()
-                < state.scheduled_from_background.len()
+            if state.scheduled_from_foreground.is_empty()
+                && state.scheduled_from_background.is_empty()
             {
-                let ix = ix - state.scheduled_from_foreground.len();
-                let (_, backtrace) = &state.scheduled_from_background[ix];
-                trace.record(&state, backtrace.clone());
-                let runnable = state.scheduled_from_background.remove(ix).0;
-                drop(state);
-                runnable.run();
-            } else if ix < runnable_count {
-                let (_, backtrace) = &state.spawned_from_foreground[0];
-                trace.record(&state, backtrace.clone());
-                let runnable = state.spawned_from_foreground.remove(0).0;
+                return None;
+            }
+
+            if !state.scheduled_from_background.is_empty() && state.rng.gen() {
+                let background_len = state.scheduled_from_background.len();
+                let ix = state.rng.gen_range(0..background_len);
+                let runnable = state.scheduled_from_background.remove(ix);
                 drop(state);
                 runnable.run();
-            } else {
-                drop(state);
-                if let Poll::Ready(result) = future.poll(&mut cx) {
-                    return Some(result);
+            } else if !state.scheduled_from_foreground.is_empty() {
+                let available_cx_ids = state
+                    .scheduled_from_foreground
+                    .keys()
+                    .copied()
+                    .collect::<Vec<_>>();
+                let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap();
+                let scheduled_from_cx = state
+                    .scheduled_from_foreground
+                    .get_mut(&cx_id_to_run)
+                    .unwrap();
+                let runnable = scheduled_from_cx.remove(0);
+                if scheduled_from_cx.is_empty() {
+                    state.scheduled_from_foreground.remove(&cx_id_to_run);
                 }
 
-                let state = self.state.lock();
-
-                if state.scheduled_from_foreground.is_empty()
-                    && state.scheduled_from_background.is_empty()
-                    && state.spawned_from_foreground.is_empty()
-                {
-                    return None;
+                drop(state);
+                match runnable {
+                    ScheduledForeground::MainFuture => {
+                        scheduled_main_future.store(false, SeqCst);
+                        if let Poll::Ready(result) = future.poll(&mut cx) {
+                            return Some(result);
+                        }
+                    }
+                    ScheduledForeground::Runnable(runnable) => {
+                        runnable.run();
+                    }
                 }
+            } else {
+                return None;
             }
         }
     }
@@ -230,15 +269,12 @@ impl Deterministic {
         };
 
         let mut cx = Context::from_waker(&waker);
-        let mut trace = Trace::default();
         for _ in 0..max_ticks {
             let mut state = self.state.lock();
             let runnable_count = state.scheduled_from_background.len();
             let ix = state.rng.gen_range(0..=runnable_count);
             if ix < state.scheduled_from_background.len() {
-                let (_, backtrace) = &state.scheduled_from_background[ix];
-                trace.record(&state, backtrace.clone());
-                let runnable = state.scheduled_from_background.remove(ix).0;
+                let runnable = state.scheduled_from_background.remove(ix);
                 drop(state);
                 runnable.run();
             } else {
@@ -281,69 +317,13 @@ impl DeterministicState {
     }
 }
 
-#[derive(Default)]
-struct Trace {
-    executed: Vec<Backtrace>,
-    scheduled: Vec<Vec<Backtrace>>,
-    spawned_from_foreground: Vec<Vec<Backtrace>>,
-}
-
-impl Trace {
-    fn record(&mut self, state: &DeterministicState, executed: Backtrace) {
-        self.scheduled.push(
-            state
-                .scheduled_from_foreground
-                .iter()
-                .map(|(_, backtrace)| backtrace.clone())
-                .collect(),
-        );
-        self.spawned_from_foreground.push(
-            state
-                .spawned_from_foreground
-                .iter()
-                .map(|(_, backtrace)| backtrace.clone())
-                .collect(),
-        );
-        self.executed.push(executed);
-    }
-
-    fn resolve(&mut self) {
-        for backtrace in &mut self.executed {
-            backtrace.resolve();
-        }
-
-        for backtraces in &mut self.scheduled {
-            for backtrace in backtraces {
-                backtrace.resolve();
-            }
-        }
-
-        for backtraces in &mut self.spawned_from_foreground {
-            for backtrace in backtraces {
-                backtrace.resolve();
-            }
-        }
-    }
-}
-
 struct CwdBacktrace<'a> {
     backtrace: &'a Backtrace,
-    first_frame_only: bool,
 }
 
 impl<'a> CwdBacktrace<'a> {
     fn new(backtrace: &'a Backtrace) -> Self {
-        Self {
-            backtrace,
-            first_frame_only: false,
-        }
-    }
-
-    fn first_frame(backtrace: &'a Backtrace) -> Self {
-        Self {
-            backtrace,
-            first_frame_only: true,
-        }
+        Self { backtrace }
     }
 }
 
@@ -362,69 +342,12 @@ impl<'a> Debug for CwdBacktrace<'a> {
                 .any(|s| s.filename().map_or(false, |f| f.starts_with(&cwd)))
             {
                 formatted_frame.backtrace_frame(frame)?;
-                if self.first_frame_only {
-                    break;
-                }
             }
         }
         fmt.finish()
     }
 }
 
-impl Debug for Trace {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        for ((backtrace, scheduled), spawned_from_foreground) in self
-            .executed
-            .iter()
-            .zip(&self.scheduled)
-            .zip(&self.spawned_from_foreground)
-        {
-            writeln!(f, "Scheduled")?;
-            for backtrace in scheduled {
-                writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
-            }
-            if scheduled.is_empty() {
-                writeln!(f, "None")?;
-            }
-            writeln!(f, "==========")?;
-
-            writeln!(f, "Spawned from foreground")?;
-            for backtrace in spawned_from_foreground {
-                writeln!(f, "- {:?}", CwdBacktrace::first_frame(backtrace))?;
-            }
-            if spawned_from_foreground.is_empty() {
-                writeln!(f, "None")?;
-            }
-            writeln!(f, "==========")?;
-
-            writeln!(f, "Run: {:?}", CwdBacktrace::first_frame(backtrace))?;
-            writeln!(f, "+++++++++++++++++++")?;
-        }
-
-        Ok(())
-    }
-}
-
-impl Drop for Trace {
-    fn drop(&mut self) {
-        let trace_on_panic = if let Ok(trace_on_panic) = std::env::var("EXECUTOR_TRACE_ON_PANIC") {
-            trace_on_panic == "1" || trace_on_panic == "true"
-        } else {
-            false
-        };
-        let trace_always = if let Ok(trace_always) = std::env::var("EXECUTOR_TRACE_ALWAYS") {
-            trace_always == "1" || trace_always == "true"
-        } else {
-            false
-        };
-
-        if trace_always || (trace_on_panic && thread::panicking()) {
-            self.resolve();
-            dbg!(self);
-        }
-    }
-}
-
 impl Foreground {
     pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
         if dispatcher.is_main_thread() {
@@ -440,7 +363,9 @@ impl Foreground {
     pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
         let future = any_local_future(future);
         let any_task = match self {
-            Self::Deterministic(executor) => executor.spawn_from_foreground(future),
+            Self::Deterministic { cx_id, executor } => {
+                executor.spawn_from_foreground(*cx_id, future)
+            }
             Self::Platform { dispatcher, .. } => {
                 fn spawn_inner(
                     future: AnyLocalFuture,
@@ -462,7 +387,7 @@ impl Foreground {
     pub fn run<T: 'static>(&self, future: impl 'static + Future<Output = T>) -> T {
         let future = any_local_future(future);
         let any_value = match self {
-            Self::Deterministic(executor) => executor.run(future),
+            Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future),
             Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"),
         };
         *any_value.downcast().unwrap()
@@ -470,14 +395,14 @@ impl Foreground {
 
     pub fn parking_forbidden(&self) -> bool {
         match self {
-            Self::Deterministic(executor) => executor.state.lock().forbid_parking,
+            Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking,
             _ => panic!("this method can only be called on a deterministic executor"),
         }
     }
 
     pub fn start_waiting(&self) {
         match self {
-            Self::Deterministic(executor) => {
+            Self::Deterministic { executor, .. } => {
                 executor.state.lock().waiting_backtrace = Some(Backtrace::new_unresolved());
             }
             _ => panic!("this method can only be called on a deterministic executor"),
@@ -486,7 +411,7 @@ impl Foreground {
 
     pub fn finish_waiting(&self) {
         match self {
-            Self::Deterministic(executor) => {
+            Self::Deterministic { executor, .. } => {
                 executor.state.lock().waiting_backtrace.take();
             }
             _ => panic!("this method can only be called on a deterministic executor"),
@@ -495,7 +420,7 @@ impl Foreground {
 
     pub fn forbid_parking(&self) {
         match self {
-            Self::Deterministic(executor) => {
+            Self::Deterministic { executor, .. } => {
                 let mut state = executor.state.lock();
                 state.forbid_parking = true;
                 state.rng = StdRng::seed_from_u64(state.seed);
@@ -506,7 +431,7 @@ impl Foreground {
 
     pub async fn timer(&self, duration: Duration) {
         match self {
-            Self::Deterministic(executor) => {
+            Self::Deterministic { executor, .. } => {
                 let (tx, mut rx) = barrier::channel();
                 {
                     let mut state = executor.state.lock();
@@ -523,8 +448,8 @@ impl Foreground {
 
     pub fn advance_clock(&self, duration: Duration) {
         match self {
-            Self::Deterministic(executor) => {
-                executor.run_until_parked();
+            Self::Deterministic { cx_id, executor } => {
+                executor.run_until_parked(*cx_id);
 
                 let mut state = executor.state.lock();
                 state.now += duration;
@@ -541,7 +466,7 @@ impl Foreground {
 
     pub fn set_block_on_ticks(&self, range: RangeInclusive<usize>) {
         match self {
-            Self::Deterministic(executor) => executor.state.lock().block_on_ticks = range,
+            Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range,
             _ => panic!("this method can only be called on a deterministic executor"),
         }
     }
@@ -579,7 +504,7 @@ impl Background {
         let future = any_future(future);
         let any_task = match self {
             Self::Production { executor, .. } => executor.spawn(future),
-            Self::Deterministic { executor, .. } => executor.spawn(future),
+            Self::Deterministic { executor } => executor.spawn(future),
         };
         Task::send(any_task)
     }
@@ -646,14 +571,6 @@ impl<'a> Scope<'a> {
     }
 }
 
-pub fn deterministic(seed: u64) -> (Rc<Foreground>, Arc<Background>) {
-    let executor = Arc::new(Deterministic::new(seed));
-    (
-        Rc::new(Foreground::Deterministic(executor.clone())),
-        Arc::new(Background::Deterministic { executor }),
-    )
-}
-
 impl<T> Task<T> {
     pub fn ready(value: T) -> Self {
         Self::Ready(Some(value))

crates/gpui/src/test.rs 🔗

@@ -28,7 +28,12 @@ pub fn run_test(
     mut starting_seed: u64,
     max_retries: usize,
     test_fn: &mut (dyn RefUnwindSafe
-              + Fn(&mut MutableAppContext, Rc<platform::test::ForegroundPlatform>, u64)),
+              + Fn(
+        &mut MutableAppContext,
+        Rc<platform::test::ForegroundPlatform>,
+        Arc<executor::Deterministic>,
+        u64,
+    )),
 ) {
     let is_randomized = num_iterations > 1;
     if is_randomized {
@@ -60,16 +65,16 @@ pub fn run_test(
                     dbg!(seed);
                 }
 
-                let (foreground, background) = executor::deterministic(seed);
+                let deterministic = executor::Deterministic::new(seed);
                 let mut cx = TestAppContext::new(
                     foreground_platform.clone(),
                     platform.clone(),
-                    foreground.clone(),
-                    background.clone(),
+                    deterministic.build_foreground(usize::MAX),
+                    deterministic.build_background(),
                     font_cache.clone(),
                     0,
                 );
-                cx.update(|cx| test_fn(cx, foreground_platform.clone(), seed));
+                cx.update(|cx| test_fn(cx, foreground_platform.clone(), deterministic, seed));
 
                 atomic_seed.fetch_add(1, SeqCst);
             }

crates/gpui_macros/src/gpui_macros.rs 🔗

@@ -77,8 +77,8 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
                                 #namespace::TestAppContext::new(
                                     foreground_platform.clone(),
                                     cx.platform().clone(),
-                                    cx.foreground().clone(),
-                                    cx.background().clone(),
+                                    deterministic.build_foreground(#ix),
+                                    deterministic.build_background(),
                                     cx.font_cache().clone(),
                                     #first_entity_id,
                                 ),
@@ -115,7 +115,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
                     #num_iterations as u64,
                     #starting_seed as u64,
                     #max_retries,
-                    &mut |cx, foreground_platform, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args))
+                    &mut |cx, foreground_platform, deterministic, seed| cx.foreground().run(#inner_fn_name(#inner_fn_args))
                 );
             }
         }
@@ -147,7 +147,7 @@ pub fn test(args: TokenStream, function: TokenStream) -> TokenStream {
                     #num_iterations as u64,
                     #starting_seed as u64,
                     #max_retries,
-                    &mut |cx, _, seed| #inner_fn_name(#inner_fn_args)
+                    &mut |cx, _, _, seed| #inner_fn_name(#inner_fn_args)
                 );
             }
         }

crates/server/src/rpc.rs 🔗

@@ -1794,7 +1794,7 @@ mod tests {
         });
     }
 
-    #[gpui::test]
+    #[gpui::test(iterations = 100)]
     async fn test_editing_while_guest_opens_buffer(
         mut cx_a: TestAppContext,
         mut cx_b: TestAppContext,