From d3d1be4fbad1813345e7dee3f1f68ff97766d988 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Sun, 31 Aug 2025 19:06:23 -0600 Subject: [PATCH] Update TestScheduler design with SchedulerConfig, VariationSource enum, and SchedulerFuzzInput for fuzzing support --- ...25-08-31_15-47-17_test-scheduler-design.md | 441 ++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 thoughts/2025-08-31_15-47-17_test-scheduler-design.md diff --git a/thoughts/2025-08-31_15-47-17_test-scheduler-design.md b/thoughts/2025-08-31_15-47-17_test-scheduler-design.md new file mode 100644 index 0000000000000000000000000000000000000000..0acc6cfab63cfc1e1fee2ff3a7187fcdfbf66f8e --- /dev/null +++ b/thoughts/2025-08-31_15-47-17_test-scheduler-design.md @@ -0,0 +1,441 @@ + +zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md + + + +Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety + + +# TestScheduler Design Details + +This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. + +## Overview + +The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling: + +- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan). +- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). +- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management. + +There is **no separate TestScheduler trait**—all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. + +## Design Principles + +- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features. +- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine. +- **Determinism**: Always seeded with configurable randomization. +- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests. +- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor). + +## Complete TestScheduler Implementation + +```rust +use std::collections::{HashMap, HashSet, VecDeque}; +use std::future::Future; +use std::ops::RangeInclusive; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; + +use anyhow::{anyhow, Result}; +use async_task::Runnable; +use chrono::{DateTime, Utc}; +use futures::channel::oneshot; +use parking_lot::{Mutex as ParkingMutex, MutexGuard}; +use rand::prelude::*; +use rand_chacha::ChaCha8Rng; + +// Core types (shared with main crate) +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct TaskId(usize); + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct TaskLabel(usize); + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct SessionId(usize); + +pub enum ThreadTarget { Main, Background } + +pub enum VariationSource { + /// Seeded randomness for reproducible probabilistic scheduling. + Seeded(u64), + /// Fuzzed inputs for deterministic exploration of variations. + Fuzzed(SchedulerFuzzInput), +} + +#[derive(Clone, Debug)] +pub struct SchedulerConfig { + /// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage. + pub randomize_order: bool, + /// Max steps before panic (for deadlock detection). + pub max_steps: usize, + /// Whether to log operations (for debugging). + pub log_operations: bool, + /// The source of non-deterministic decisions (mutually exclusive modes). + pub variation_source: VariationSource, +} + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + randomize_order: true, + max_steps: 10000, + log_operations: false, + variation_source: VariationSource::Seeded(0), + } + } +} + +impl SchedulerConfig { + /// Create a seeded config (randomization enabled by default). + pub fn seeded(seed: u64) -> Self { + Self { + variation_source: VariationSource::Seeded(seed), + ..Default::default() + } + } + + /// Create a fuzzed config (randomization enabled by default). + pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self { + Self { + variation_source: VariationSource::Fuzzed(fuzz_inputs), + ..Default::default() + } + } + + /// For isolation: Disable randomization for deterministic testing. + pub fn deterministic(seed: u64) -> Self { + Self { + randomize_order: false, + variation_source: VariationSource::Seeded(seed), + ..Default::default() + } + } +} + +#[derive(Arbitrary, Debug, Clone)] +pub struct SchedulerFuzzInput { + queue_selections: Vec, + task_indices: Vec, + delay_bools: Vec, + block_ticks: Vec, +} + +pub struct Task { + id: TaskId, + rx: oneshot::Receiver, + scheduler: Arc, +} + +impl Future for Task { + type Output = T; + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.rx.try_recv() { + Ok(Some(val)) => Poll::Ready(val), + _ => { cx.waker().wake_by_ref(); Poll::Pending } + } + } +} + +// Internal state +struct TaskInfo { + label: Option, + session_id: Option, + waker: Option, + future: Option>>>, + state: TaskState, + delay_range: Option<(Instant, Instant)>, +} + +#[derive(PartialEq)] +enum TaskState { Pending, Running, Completed } + +struct WorkerSession { + spawned_tasks: HashSet, + wait_until_tasks: HashSet, +} + +struct SchedulerState { + rng: ChaCha8Rng, + randomize_order: bool, + max_steps: usize, + current_time: Instant, + next_task_id: AtomicUsize, + tasks: HashMap, + sessions: HashMap, + current_session: Option, + ready_queue: VecDeque, + deprioritized_queue: VecDeque, + main_thread_queue: VecDeque, + delayed: Vec<(Instant, TaskId)>, + deprioritized_labels: HashSet, + block_tick_range: std::ops::RangeInclusive, + parker: parking_lot::Parker, + unparker: parking_lot::Unparker, + parking_allowed: AtomicBool, + execution_history: Vec, + fuzz_inputs: Option, +} + +// Concrete implementation +pub struct TestScheduler { + state: Arc>, +} +``` + +impl TestScheduler { + /// Primary constructor: Create a scheduler from full configuration. + pub fn new(config: SchedulerConfig) -> Arc { + let (parker, unparker) = parking_lot::pair(); + let (rng, fuzz_inputs) = match config.variation_source { + VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None), + VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)), + }; + let state = SchedulerState { + rng, + randomize_order: config.randomize_order, + max_steps: config.max_steps, + current_time: Instant::now(), + next_task_id: AtomicUsize::new(1), + tasks: HashMap::new(), + sessions: HashMap::new(), + current_session: None, + ready_queue: VecDeque::new(), + deprioritized_queue: VecDeque::new(), + main_thread_queue: VecDeque::new(), + delayed: Vec::new(), + deprioritized_labels: HashSet::new(), + block_tick_range: 0..=1000, + parker, + unparker, + parking_allowed: AtomicBool::new(false), + execution_history: Vec::new(), + fuzz_inputs, + }; + Arc::new(Self { state: Arc::new(Mutex::new(state)) }) + } + + /// Convenience helper: Create a seeded scheduler (randomization enabled by default). + pub fn from_seed(seed: u64) -> Arc { + Self::new(SchedulerConfig::seeded(seed)) + } + + /// Convenience helper: Create a scheduler driven by fuzzed inputs. + /// Use for fuzzing with Bolero. + pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc { + Self::new(SchedulerConfig::fuzzed(fuzz_inputs)) + } + + // Test-specific methods (direct on impl, no trait) + pub fn assign_task_id(&self) -> TaskId { + TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst)) + } + + pub fn deprioritize(&self, label: TaskLabel) { + let mut state = self.state.lock().unwrap(); + state.deprioritized_labels.insert(label); + } + + pub fn is_task_running(&self, task_id: TaskId) -> bool { + let state = self.state.lock().unwrap(); + state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running) + } + + pub fn tick(&self, background_only: bool) -> bool { + self.tick_internal(background_only) + } + + fn tick_internal(&self, background_only: bool) -> bool { + // Process delays first (drop lock before polling) + { + let mut state = self.state.lock().unwrap(); + state.delayed.retain(|&(time, task_id)| { + if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) { + state.ready_queue.push_back(task_id); + false + } else { true } + }); + } // Lock dropped here + + // Select and poll task without lock held + let task_to_poll = { + let mut state = self.state.lock().unwrap(); + let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue]; + if !background_only { queues.insert(0, &mut state.main_thread_queue); } + + let mut available: Vec = queues.iter().enumerate() + .filter(|&(_, q)| !q.is_empty()) + .map(|(i, _)| i) + .collect(); + + if available.is_empty() { return false; } + + if state.randomize_order { available.shuffle(&mut state.rng); } + + let queue_ix = available[0]; + let task_id = queues[queue_ix].pop_front().unwrap(); + Some(task_id) + }; // Lock dropped here + + // Poll the task's future outside the lock + let poll_result = { + let mut state = self.state.lock().unwrap(); + if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) { + task.state = TaskState::Running; + if let Some(fut) = task.future.as_mut() { + if let Some(waker) = task.waker.as_ref() { + let mut context = Context::from_waker(waker); + fut.as_mut().poll(&mut context) + } else { + Poll::Pending + } + } else { + Poll::Pending + } + } else { + Poll::Pending + } + }; // Lock dropped here + + // Update task state after polling + if poll_result.is_ready() { + let mut state = self.state.lock().unwrap(); + if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) { + task.state = TaskState::Completed; + state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0)); + } + } + + true + } + + pub fn advance_clock(&self, duration: Duration) { + let mut state = self.state.lock().unwrap(); + state.current_time += duration; + } + + pub fn run_until_parked(&self) { + while self.tick(false) {} + } + + // Cloud session methods + pub fn create_session(&self) -> SessionId { + let mut state = self.state.lock().unwrap(); + let id = SessionId(state.sessions.len()); + state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() }); + id + } + + pub fn set_current_session(&self, session_id: Option) { + let mut state = self.state.lock().unwrap(); + state.current_session = session_id; + } + + pub fn get_current_session(&self) -> Option { + self.state.lock().unwrap().current_session + } + + pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) { + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.sessions.get_mut(&session_id) { + session.spawned_tasks.insert(task_id); + } + } + + pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) { + let mut state = self.state.lock().unwrap(); + if let Some(session) = state.sessions.get_mut(&session_id) { + session.wait_until_tasks.insert(task_id); + } + } + + pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> { + let state = self.state.lock().unwrap(); + if let Some(session) = state.sessions.get(&session_id) { + let dangling: Vec<_> = session.spawned_tasks.iter() + .filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed)) + .filter(|&&tid| !session.wait_until_tasks.contains(&tid)) + .cloned() + .collect(); + if !dangling.is_empty() { + return Err(anyhow!("{} dangling tasks", dangling.len())); + } + } + Ok(()) + } + + // Other test methods (e.g., GPUI block simulation) + pub fn gen_block_on_ticks(&self) -> usize { + let state = self.state.lock().unwrap(); + state.rng.gen_range(state.block_on_ticks_range.clone()) + } +} +``` + +## GPUI Usage Example + +GPUI wraps the TestScheduler to maintain its existing API: + +```rust +pub struct BackgroundExecutor { + scheduler: Arc, // TestScheduler implements Scheduler +} + +impl BackgroundExecutor { + pub fn spawn_labeled( + &self, + label: TaskLabel, + future: impl Future + Send + 'static + ) -> Task { + // Direct downcast for GPUI test features + if let Some(test_sched) = self.scheduler.as_any().downcast_ref::() { + test_sched.deprioritize(label); // Use test method directly + } + self.scheduler.spawn(future) + } + + pub fn deprioritize(&self, label: TaskLabel) { + if let Some(test_sched) = self.scheduler.as_any().downcast_ref::() { + test_sched.deprioritize(label); + } + } +} +``` + +## Cloud Usage Example + +Cloud wraps for session coordination: + +```rust +impl SimulatedExecutionContext { + pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> { + let task = self.scheduler.spawn(async move { future.await })?; + + // Direct use of TestScheduler methods + let scheduler = self.scheduler.as_any().downcast_ref::().unwrap(); + if let Some(session_id) = scheduler.get_current_session() { + scheduler.track_task_for_session(task.id(), session_id); + scheduler.add_wait_until_task(session_id, task.id()); + } + + Ok(()) + } +} +``` + +## Key Dependencies and Assumptions + +- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`. +- **async_task**: For Runnable wrapping. +- **rand_chacha**: For seeded RNG. +- **futures**: For channels. +- **chrono**: For time ranges (optional). +- **anyhow**: For errors. + +The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting. + +This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct. \ No newline at end of file