Finish implementing DeterministicExecutor::advance_clock

Max Brunsfeld created

* Start by running all non-timer futures to completion, to ensure that
  timers have a chance to be registered.
* Release executor's state lock before waking any timers

Change summary

gpui/src/executor.rs | 60 +++++++++++++++++++++++++++++++--------------
zed/src/rpc.rs       |  8 ++---
2 files changed, 44 insertions(+), 24 deletions(-)

Detailed changes

gpui/src/executor.rs 🔗

@@ -124,17 +124,39 @@ impl Deterministic {
         T: 'static,
         F: Future<Output = T> + 'static,
     {
+        let woken = Arc::new(AtomicBool::new(false));
+        let mut future = Box::pin(future);
+        loop {
+            if let Some(result) = self.run_internal(woken.clone(), &mut future) {
+                return result;
+            }
+
+            if !woken.load(SeqCst) && self.state.lock().forbid_parking {
+                panic!("deterministic executor parked after a call to forbid_parking");
+            }
+
+            woken.store(false, SeqCst);
+            self.parker.lock().park();
+        }
+    }
+
+    fn run_until_parked(&self) {
+        let woken = Arc::new(AtomicBool::new(false));
+        let future = std::future::pending::<()>();
         smol::pin!(future);
+        self.run_internal(woken, future);
+    }
 
+    pub fn run_internal<F, T>(&self, woken: Arc<AtomicBool>, mut future: F) -> Option<T>
+    where
+        T: 'static,
+        F: Future<Output = T> + Unpin,
+    {
         let unparker = self.parker.lock().unparker();
-        let woken = Arc::new(AtomicBool::new(false));
-        let waker = {
-            let woken = woken.clone();
-            waker_fn(move || {
-                woken.store(true, SeqCst);
-                unparker.unpark();
-            })
-        };
+        let waker = waker_fn(move || {
+            woken.store(true, SeqCst);
+            unparker.unpark();
+        });
 
         let mut cx = Context::from_waker(&waker);
         let mut trace = Trace::default();
@@ -168,23 +190,17 @@ impl Deterministic {
                 runnable.run();
             } else {
                 drop(state);
-                if let Poll::Ready(result) = future.as_mut().poll(&mut cx) {
-                    return result;
+                if let Poll::Ready(result) = future.poll(&mut cx) {
+                    return Some(result);
                 }
+
                 let state = self.state.lock();
                 if state.scheduled_from_foreground.is_empty()
                     && state.scheduled_from_background.is_empty()
                     && state.spawned_from_foreground.is_empty()
                 {
-                    if state.forbid_parking && !woken.load(SeqCst) {
-                        panic!("deterministic executor parked after a call to forbid_parking");
-                    }
-                    drop(state);
-                    woken.store(false, SeqCst);
-                    self.parker.lock().park();
+                    return None;
                 }
-
-                continue;
             }
         }
     }
@@ -432,10 +448,16 @@ impl Foreground {
     pub fn advance_clock(&self, duration: Duration) {
         match self {
             Self::Deterministic(executor) => {
+                executor.run_until_parked();
+
                 let mut state = executor.state.lock();
                 state.now += duration;
                 let now = state.now;
-                state.pending_sleeps.retain(|(wakeup, _)| *wakeup > now);
+                let mut pending_sleeps = mem::take(&mut state.pending_sleeps);
+                drop(state);
+
+                pending_sleeps.retain(|(wakeup, _)| *wakeup > now);
+                executor.state.lock().pending_sleeps.extend(pending_sleeps);
             }
             _ => panic!("this method can only be called on a deterministic executor"),
         }

zed/src/rpc.rs 🔗

@@ -3,12 +3,10 @@ use anyhow::{anyhow, Context, Result};
 use async_tungstenite::tungstenite::{
     http::Request, Error as WebSocketError, Message as WebSocketMessage,
 };
-use futures::StreamExt as _;
 use gpui::{AsyncAppContext, Entity, ModelContext, Task};
 use lazy_static::lazy_static;
 use parking_lot::RwLock;
 use postage::{prelude::Stream, watch};
-use smol::Timer;
 use std::{
     any::TypeId,
     collections::HashMap,
@@ -490,18 +488,18 @@ mod tests {
     use crate::test::FakeServer;
     use gpui::TestAppContext;
 
-    #[gpui::test(iterations = 1000)]
+    #[gpui::test(iterations = 10)]
     async fn test_heartbeat(cx: TestAppContext) {
         let user_id = 5;
         let client = Client::new();
-
-        client.state.write().heartbeat_interval = Duration::from_millis(1);
         let mut server = FakeServer::for_client(user_id, &client, &cx).await;
 
+        cx.foreground().advance_clock(Duration::from_secs(10));
         let ping = server.receive::<proto::Ping>().await.unwrap();
         assert_eq!(ping.payload.id, 0);
         server.respond(ping.receipt(), proto::Pong { id: 0 }).await;
 
+        cx.foreground().advance_clock(Duration::from_secs(10));
         let ping = server.receive::<proto::Ping>().await.unwrap();
         assert_eq!(ping.payload.id, 1);
         server.respond(ping.receipt(), proto::Pong { id: 1 }).await;