WIP - Panic immediately when detecting non-determinism via a change to the execution trace

Max Brunsfeld created

Change summary

crates/client/src/client.rs                             |   8 
crates/collab/src/executor.rs                           |   8 
crates/collab/src/rpc.rs                                |  10 
crates/collab/src/tests/randomized_integration_tests.rs |   2 
crates/gpui/src/executor.rs                             | 110 ++++++++++
crates/gpui/src/test.rs                                 |  16 +
6 files changed, 138 insertions(+), 16 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -851,6 +851,7 @@ impl Client {
             })
             .detach();
 
+        let t0 = Instant::now();
         let this = self.clone();
         let cx = cx.clone();
         cx.foreground()
@@ -867,7 +868,12 @@ impl Client {
                         }
                     }
                     Err(err) => {
-                        log::error!("connection error: {:?}", err);
+                        // TODO - remove. Make the test's non-determinism more apparent by
+                        // only sometimes formatting this stack trace.
+                        if Instant::now().duration_since(t0).as_nanos() % 2 == 0 {
+                            log::error!("connection error: {:?}", err);
+                        }
+
                         this.set_status(Status::ConnectionLost, &cx);
                     }
                 }

crates/collab/src/executor.rs 🔗

@@ -33,4 +33,12 @@ impl Executor {
             }
         }
     }
+
+    pub fn record_backtrace(&self) {
+        match self {
+            Executor::Production => {}
+            #[cfg(test)]
+            Executor::Deterministic(background) => background.record_backtrace(),
+        }
+    }
 }

crates/collab/src/rpc.rs 🔗

@@ -95,6 +95,7 @@ struct Session {
     peer: Arc<Peer>,
     connection_pool: Arc<parking_lot::Mutex<ConnectionPool>>,
     live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
+    executor: Executor,
 }
 
 impl Session {
@@ -521,7 +522,8 @@ impl Server {
                 db: Arc::new(tokio::sync::Mutex::new(DbHandle(this.app_state.db.clone()))),
                 peer: this.peer.clone(),
                 connection_pool: this.connection_pool.clone(),
-                live_kit_client: this.app_state.live_kit_client.clone()
+                live_kit_client: this.app_state.live_kit_client.clone(),
+                executor: executor.clone(),
             };
             update_user_contacts(user_id, &session).await?;
 
@@ -1515,6 +1517,7 @@ async fn update_language_server(
     request: proto::UpdateLanguageServer,
     session: Session,
 ) -> Result<()> {
+    session.executor.record_backtrace();
     let project_id = ProjectId::from_proto(request.project_id);
     let project_connection_ids = session
         .db()
@@ -1541,6 +1544,7 @@ async fn forward_project_request<T>(
 where
     T: EntityMessage + RequestMessage,
 {
+    session.executor.record_backtrace();
     let project_id = ProjectId::from_proto(request.remote_entity_id());
     let host_connection_id = {
         let collaborators = session
@@ -1609,6 +1613,7 @@ async fn create_buffer_for_peer(
     request: proto::CreateBufferForPeer,
     session: Session,
 ) -> Result<()> {
+    session.executor.record_backtrace();
     let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?;
     session
         .peer
@@ -1621,6 +1626,7 @@ async fn update_buffer(
     response: Response<proto::UpdateBuffer>,
     session: Session,
 ) -> Result<()> {
+    session.executor.record_backtrace();
     let project_id = ProjectId::from_proto(request.project_id);
     let project_connection_ids = session
         .db()
@@ -1628,6 +1634,8 @@ async fn update_buffer(
         .project_connection_ids(project_id, session.connection_id)
         .await?;
 
+    session.executor.record_backtrace();
+
     broadcast(
         session.connection_id,
         project_connection_ids.iter().copied(),

crates/collab/src/tests/randomized_integration_tests.rs 🔗

@@ -17,7 +17,7 @@ use project::{search::SearchQuery, Project};
 use rand::prelude::*;
 use std::{env, path::PathBuf, sync::Arc};
 
-#[gpui::test(iterations = 100)]
+#[gpui::test(iterations = 100, detect_nondeterminism = true)]
 async fn test_random_collaboration(
     cx: &mut TestAppContext,
     deterministic: Arc<Deterministic>,

crates/gpui/src/executor.rs 🔗

@@ -4,7 +4,7 @@ use futures::channel::mpsc;
 use smol::{channel, prelude::*, Executor};
 use std::{
     any::Any,
-    fmt::{self, Display},
+    fmt::{self, Display, Write as _},
     marker::PhantomData,
     mem,
     pin::Pin,
@@ -17,7 +17,8 @@ use std::{
 
 use crate::{
     platform::{self, Dispatcher},
-    util, MutableAppContext,
+    util::{self, CwdBacktrace},
+    MutableAppContext,
 };
 
 pub enum Foreground {
@@ -74,11 +75,18 @@ struct DeterministicState {
     pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>,
     waiting_backtrace: Option<backtrace::Backtrace>,
     next_runnable_id: usize,
-    poll_history: Vec<usize>,
+    poll_history: Vec<ExecutorEvent>,
+    previous_poll_history: Option<Vec<ExecutorEvent>>,
     enable_runnable_backtraces: bool,
     runnable_backtraces: collections::HashMap<usize, backtrace::Backtrace>,
 }
 
+#[derive(Copy, Clone, Debug, PartialEq, Eq)]
+pub enum ExecutorEvent {
+    PollRunnable { id: usize },
+    EnqueuRunnable { id: usize },
+}
+
 #[cfg(any(test, feature = "test-support"))]
 struct ForegroundRunnable {
     id: usize,
@@ -130,6 +138,7 @@ impl Deterministic {
                 waiting_backtrace: None,
                 next_runnable_id: 0,
                 poll_history: Default::default(),
+                previous_poll_history: Default::default(),
                 enable_runnable_backtraces: false,
                 runnable_backtraces: Default::default(),
             })),
@@ -137,10 +146,14 @@ impl Deterministic {
         })
     }
 
-    pub fn runnable_history(&self) -> Vec<usize> {
+    pub fn execution_history(&self) -> Vec<ExecutorEvent> {
         self.state.lock().poll_history.clone()
     }
 
+    pub fn set_previous_execution_history(&self, history: Option<Vec<ExecutorEvent>>) {
+        self.state.lock().previous_poll_history = history;
+    }
+
     pub fn enable_runnable_backtrace(&self) {
         self.state.lock().enable_runnable_backtraces = true;
     }
@@ -185,6 +198,9 @@ impl Deterministic {
         let unparker = self.parker.lock().unparker();
         let (runnable, task) = async_task::spawn_local(future, move |runnable| {
             let mut state = state.lock();
+            state
+                .poll_history
+                .push(ExecutorEvent::EnqueuRunnable { id });
             state
                 .scheduled_from_foreground
                 .entry(cx_id)
@@ -212,6 +228,9 @@ impl Deterministic {
         let unparker = self.parker.lock().unparker();
         let (runnable, task) = async_task::spawn(future, move |runnable| {
             let mut state = state.lock();
+            state
+                .poll_history
+                .push(ExecutorEvent::EnqueuRunnable { id });
             state
                 .scheduled_from_background
                 .push(BackgroundRunnable { id, runnable });
@@ -314,7 +333,9 @@ impl Deterministic {
                 let background_len = state.scheduled_from_background.len();
                 let ix = state.rng.gen_range(0..background_len);
                 let background_runnable = state.scheduled_from_background.remove(ix);
-                state.poll_history.push(background_runnable.id);
+                state.push_to_history(ExecutorEvent::PollRunnable {
+                    id: background_runnable.id,
+                });
                 drop(state);
                 background_runnable.runnable.run();
             } else if !state.scheduled_from_foreground.is_empty() {
@@ -332,7 +353,9 @@ impl Deterministic {
                 if scheduled_from_cx.is_empty() {
                     state.scheduled_from_foreground.remove(&cx_id_to_run);
                 }
-                state.poll_history.push(foreground_runnable.id);
+                state.push_to_history(ExecutorEvent::PollRunnable {
+                    id: foreground_runnable.id,
+                });
 
                 drop(state);
 
@@ -366,7 +389,9 @@ impl Deterministic {
             let ix = state.rng.gen_range(0..=runnable_count);
             if ix < state.scheduled_from_background.len() {
                 let background_runnable = state.scheduled_from_background.remove(ix);
-                state.poll_history.push(background_runnable.id);
+                state.push_to_history(ExecutorEvent::PollRunnable {
+                    id: background_runnable.id,
+                });
                 drop(state);
                 background_runnable.runnable.run();
             } else {
@@ -465,6 +490,25 @@ impl Deterministic {
             }
         }
     }
+
+    pub fn record_backtrace(&self) {
+        let mut state = self.state.lock();
+        if state.enable_runnable_backtraces {
+            let current_id = state
+                .poll_history
+                .iter()
+                .rev()
+                .find_map(|event| match event {
+                    ExecutorEvent::PollRunnable { id } => Some(*id),
+                    _ => None,
+                });
+            if let Some(id) = current_id {
+                state
+                    .runnable_backtraces
+                    .insert(id, backtrace::Backtrace::new_unresolved());
+            }
+        }
+    }
 }
 
 impl Drop for Timer {
@@ -506,6 +550,38 @@ impl Future for Timer {
 
 #[cfg(any(test, feature = "test-support"))]
 impl DeterministicState {
+    fn push_to_history(&mut self, event: ExecutorEvent) {
+        self.poll_history.push(event);
+        if let Some(prev_history) = &self.previous_poll_history {
+            let ix = self.poll_history.len() - 1;
+            let prev_event = prev_history[ix];
+            if event != prev_event {
+                let mut message = String::new();
+                writeln!(
+                    &mut message,
+                    "current runnable backtrace:\n{:?}",
+                    self.runnable_backtraces.get_mut(&event.id()).map(|trace| {
+                        trace.resolve();
+                        CwdBacktrace(trace)
+                    })
+                )
+                .unwrap();
+                writeln!(
+                    &mut message,
+                    "previous runnable backtrace:\n{:?}",
+                    self.runnable_backtraces
+                        .get_mut(&prev_event.id())
+                        .map(|trace| {
+                            trace.resolve();
+                            CwdBacktrace(trace)
+                        })
+                )
+                .unwrap();
+                panic!("detected non-determinism after {ix}. {message}");
+            }
+        }
+    }
+
     fn will_park(&mut self) {
         if self.forbid_parking {
             let mut backtrace_message = String::new();
@@ -526,6 +602,16 @@ impl DeterministicState {
     }
 }
 
+#[cfg(any(test, feature = "test-support"))]
+impl ExecutorEvent {
+    pub fn id(&self) -> usize {
+        match self {
+            ExecutorEvent::PollRunnable { id } => *id,
+            ExecutorEvent::EnqueuRunnable { id } => *id,
+        }
+    }
+}
+
 impl Foreground {
     pub fn platform(dispatcher: Arc<dyn platform::Dispatcher>) -> Result<Self> {
         if dispatcher.is_main_thread() {
@@ -755,6 +841,16 @@ impl Background {
             }
         }
     }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn record_backtrace(&self) {
+        match self {
+            Self::Deterministic { executor, .. } => executor.record_backtrace(),
+            _ => {
+                panic!("this method can only be called on a deterministic executor")
+            }
+        }
+    }
 }
 
 impl Default for Background {

crates/gpui/src/test.rs 🔗

@@ -1,7 +1,10 @@
 use crate::{
-    elements::Empty, executor, platform, util::CwdBacktrace, Element, ElementBox, Entity,
-    FontCache, Handle, LeakDetector, MutableAppContext, Platform, RenderContext, Subscription,
-    TestAppContext, View,
+    elements::Empty,
+    executor::{self, ExecutorEvent},
+    platform,
+    util::CwdBacktrace,
+    Element, ElementBox, Entity, FontCache, Handle, LeakDetector, MutableAppContext, Platform,
+    RenderContext, Subscription, TestAppContext, View,
 };
 use futures::StreamExt;
 use parking_lot::Mutex;
@@ -62,7 +65,7 @@ pub fn run_test(
             let platform = Arc::new(platform::test::platform());
             let font_system = platform.fonts();
             let font_cache = Arc::new(FontCache::new(font_system));
-            let mut prev_runnable_history: Option<Vec<usize>> = None;
+            let mut prev_runnable_history: Option<Vec<ExecutorEvent>> = None;
 
             for _ in 0..num_iterations {
                 let seed = atomic_seed.load(SeqCst);
@@ -73,6 +76,7 @@ pub fn run_test(
 
                 let deterministic = executor::Deterministic::new(seed);
                 if detect_nondeterminism {
+                    deterministic.set_previous_execution_history(prev_runnable_history.clone());
                     deterministic.enable_runnable_backtrace();
                 }
 
@@ -98,7 +102,7 @@ pub fn run_test(
                 leak_detector.lock().detect();
 
                 if detect_nondeterminism {
-                    let curr_runnable_history = deterministic.runnable_history();
+                    let curr_runnable_history = deterministic.execution_history();
                     if let Some(prev_runnable_history) = prev_runnable_history {
                         let mut prev_entries = prev_runnable_history.iter().fuse();
                         let mut curr_entries = curr_runnable_history.iter().fuse();
@@ -138,7 +142,7 @@ pub fn run_test(
 
                             let last_common_backtrace = common_history_prefix
                                 .last()
-                                .map(|runnable_id| deterministic.runnable_backtrace(*runnable_id));
+                                .map(|event| deterministic.runnable_backtrace(event.id()));
 
                             writeln!(
                                 &mut error,