Add `DeterministicExecutor::block_on(duration, future)`

Antonio Scandurra , Nathan Sobo , and Max Brunsfeld created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
Co-Authored-By: Max Brunsfeld <max@zed.dev>

Change summary

Cargo.lock           |  1 
gpui/Cargo.toml      |  1 
gpui/src/executor.rs | 81 +++++++++++++++++++++++++++------------------
3 files changed, 50 insertions(+), 33 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -2159,6 +2159,7 @@ dependencies = [
  "etagere",
  "font-kit",
  "foreign-types",
+ "futures",
  "gpui_macros",
  "log",
  "metal",

gpui/Cargo.toml 🔗

@@ -9,6 +9,7 @@ async-task = "4.0.3"
 backtrace = "0.3"
 ctor = "0.1"
 etagere = "0.2"
+futures = "0.3"
 gpui_macros = { path = "../gpui_macros" }
 log = "0.4"
 num_cpus = "1.13"

gpui/src/executor.rs 🔗

@@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
 use async_task::Runnable;
 pub use async_task::Task;
 use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString};
+use futures::task::noop_waker;
 use parking_lot::Mutex;
 use rand::prelude::*;
 use smol::{channel, prelude::*, Executor};
@@ -13,13 +14,14 @@ use std::{
     rc::Rc,
     sync::{
         atomic::{AtomicBool, Ordering::SeqCst},
-        mpsc::Sender,
         Arc,
     },
+    task::{Context, Poll},
     thread,
+    time::Duration,
 };
 
-use crate::platform;
+use crate::{platform, util};
 
 pub enum Foreground {
     Platform {
@@ -43,7 +45,6 @@ struct DeterministicState {
     seed: u64,
     scheduled: Vec<(Runnable, Backtrace)>,
     spawned_from_foreground: Vec<(Runnable, Backtrace)>,
-    waker: Option<Sender<()>>,
 }
 
 pub struct Deterministic(Arc<Mutex<DeterministicState>>);
@@ -55,7 +56,6 @@ impl Deterministic {
             seed,
             scheduled: Default::default(),
             spawned_from_foreground: Default::default(),
-            waker: None,
         })))
     }
 
@@ -75,9 +75,6 @@ impl Deterministic {
             } else {
                 state.spawned_from_foreground.push((runnable, backtrace));
             }
-            if let Some(waker) = state.waker.as_ref() {
-                waker.send(()).ok();
-            }
         });
         runnable.schedule();
         task
@@ -91,11 +88,7 @@ impl Deterministic {
         let backtrace = Backtrace::new_unresolved();
         let state = self.0.clone();
         let (runnable, task) = async_task::spawn(future, move |runnable| {
-            let mut state = state.lock();
-            state.scheduled.push((runnable, backtrace.clone()));
-            if let Some(waker) = state.waker.as_ref() {
-                waker.send(()).ok();
-            }
+            state.lock().scheduled.push((runnable, backtrace.clone()));
         });
         runnable.schedule();
         task
@@ -106,43 +99,49 @@ impl Deterministic {
         T: 'static,
         F: Future<Output = T> + 'static,
     {
-        let (wake_tx, wake_rx) = std::sync::mpsc::channel();
-        let state = self.0.clone();
-        state.lock().waker = Some(wake_tx);
+        self.block_on(usize::MAX, future).unwrap()
+    }
 
-        let (output_tx, output_rx) = std::sync::mpsc::channel();
-        self.spawn_from_foreground(async move {
-            let output = future.await;
-            output_tx.send(output).unwrap();
-        })
-        .detach();
+    pub fn block_on<F, T>(&self, max_ticks: usize, future: F) -> Option<T>
+    where
+        T: 'static,
+        F: Future<Output = T>,
+    {
+        smol::pin!(future);
 
+        let waker = noop_waker();
+        let mut cx = Context::from_waker(&waker);
         let mut trace = Trace::default();
-        loop {
-            if let Ok(value) = output_rx.try_recv() {
-                state.lock().waker = None;
-                return value;
-            }
-
-            wake_rx.recv().unwrap();
+        for _ in 0..max_ticks {
             let runnable = {
-                let state = &mut *state.lock();
-                let ix = state
-                    .rng
-                    .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len());
+                let state = &mut *self.0.lock();
+                let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len();
+                let ix = state.rng.gen_range(0..=runnable_count);
                 if ix < state.scheduled.len() {
                     let (_, backtrace) = &state.scheduled[ix];
                     trace.record(&state, backtrace.clone());
                     state.scheduled.remove(ix).0
-                } else {
+                } else if ix < runnable_count {
                     let (_, backtrace) = &state.spawned_from_foreground[0];
                     trace.record(&state, backtrace.clone());
                     state.spawned_from_foreground.remove(0).0
+                } else {
+                    if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
+                        return Some(result);
+                    }
+
+                    if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() {
+                        panic!("detected non-determinism in deterministic executor");
+                    } else {
+                        continue;
+                    }
                 }
             };
 
             runnable.run();
         }
+
+        None
     }
 }
 
@@ -354,6 +353,22 @@ impl Background {
         }
     }
 
+    pub fn block_on<F, T>(&self, timeout: Duration, future: F) -> Option<T>
+    where
+        T: 'static,
+        F: Future<Output = T>,
+    {
+        match self {
+            Self::Production { .. } => {
+                smol::block_on(async move { util::timeout(timeout, future).await.ok() })
+            }
+            Self::Deterministic(executor) => {
+                let max_ticks = executor.0.lock().rng.gen_range(1..=1000);
+                executor.block_on(max_ticks, future)
+            }
+        }
+    }
+
     pub async fn scoped<'scope, F>(&self, scheduler: F)
     where
         F: FnOnce(&mut Scope<'scope>),