<file_path> zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md </file_path>
<edit_description> Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety </edit_description>
TestScheduler Design Details
This document expands on the Unified Scheduler Architecture Plan by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared Scheduler trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios.
Overview
The TestScheduler is the single concrete test implementation of the Scheduler trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
- GPUI Testing: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
- Cloud Testing: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan).
- Unified Testing: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
There is no separate TestScheduler traitβall test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context.
Design Principles
- Minimal Complexity: No unnecessary traits or abstractions; direct methods for test features.
- Merged Capabilities: Combines GPUI queues with Cloud session logic in a single state machine.
- Determinism: Always seeded with configurable randomization.
- Multi-threading Ready:
ArcandMutexfor shared access in collaborative tests. - Domain Wrappers: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor).
Complete TestScheduler Implementation
use std::collections::{HashMap, HashSet, VecDeque};
use std::future::Future;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use async_task::Runnable;
use chrono::{DateTime, Utc};
use futures::channel::oneshot;
use parking_lot::{Mutex as ParkingMutex, MutexGuard};
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
// Core types (shared with main crate)
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskId(usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct TaskLabel(usize);
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
pub struct SessionId(usize);
pub enum ThreadTarget { Main, Background }
pub enum VariationSource {
/// Seeded randomness for reproducible probabilistic scheduling.
Seeded(u64),
/// Fuzzed inputs for deterministic exploration of variations.
Fuzzed(SchedulerFuzzInput),
}
#[derive(Clone, Debug)]
pub struct SchedulerConfig {
/// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
pub randomize_order: bool,
/// Max steps before panic (for deadlock detection).
pub max_steps: usize,
/// Whether to log operations (for debugging).
pub log_operations: bool,
/// The source of non-deterministic decisions (mutually exclusive modes).
pub variation_source: VariationSource,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
randomize_order: true,
max_steps: 10000,
log_operations: false,
variation_source: VariationSource::Seeded(0),
}
}
}
impl SchedulerConfig {
/// Create a seeded config (randomization enabled by default).
pub fn seeded(seed: u64) -> Self {
Self {
variation_source: VariationSource::Seeded(seed),
..Default::default()
}
}
/// Create a fuzzed config (randomization enabled by default).
pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
Self {
variation_source: VariationSource::Fuzzed(fuzz_inputs),
..Default::default()
}
}
/// For isolation: Disable randomization for deterministic testing.
pub fn deterministic(seed: u64) -> Self {
Self {
randomize_order: false,
variation_source: VariationSource::Seeded(seed),
..Default::default()
}
}
}
#[derive(Arbitrary, Debug, Clone)]
pub struct SchedulerFuzzInput {
queue_selections: Vec<u8>,
task_indices: Vec<u32>,
delay_bools: Vec<bool>,
block_ticks: Vec<usize>,
}
pub struct Task<T> {
id: TaskId,
rx: oneshot::Receiver<T>,
scheduler: Arc<TestScheduler>,
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.rx.try_recv() {
Ok(Some(val)) => Poll::Ready(val),
_ => { cx.waker().wake_by_ref(); Poll::Pending }
}
}
}
// Internal state
struct TaskInfo {
label: Option<TaskLabel>,
session_id: Option<SessionId>,
waker: Option<Waker>,
future: Option<Pin<Box<dyn Future<Output = ()>>>>,
state: TaskState,
delay_range: Option<(Instant, Instant)>,
}
#[derive(PartialEq)]
enum TaskState { Pending, Running, Completed }
struct WorkerSession {
spawned_tasks: HashSet<TaskId>,
wait_until_tasks: HashSet<TaskId>,
}
struct SchedulerState {
rng: ChaCha8Rng,
randomize_order: bool,
max_steps: usize,
current_time: Instant,
next_task_id: AtomicUsize,
tasks: HashMap<TaskId, TaskInfo>,
sessions: HashMap<SessionId, WorkerSession>,
current_session: Option<SessionId>,
ready_queue: VecDeque<TaskId>,
deprioritized_queue: VecDeque<TaskId>,
main_thread_queue: VecDeque<TaskId>,
delayed: Vec<(Instant, TaskId)>,
deprioritized_labels: HashSet<TaskLabel>,
block_tick_range: std::ops::RangeInclusive<usize>,
parker: parking_lot::Parker,
unparker: parking_lot::Unparker,
parking_allowed: AtomicBool,
execution_history: Vec<String>,
fuzz_inputs: Option<FuzzedSchedulerInputs>,
}
// Concrete implementation
pub struct TestScheduler {
state: Arc<Mutex<SchedulerState>>,
}
impl TestScheduler { /// Primary constructor: Create a scheduler from full configuration. pub fn new(config: SchedulerConfig) -> Arc { let (parker, unparker) = parking_lot::pair(); let (rng, fuzz_inputs) = match config.variation_source { VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None), VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)), }; let state = SchedulerState { rng, randomize_order: config.randomize_order, max_steps: config.max_steps, current_time: Instant::now(), next_task_id: AtomicUsize::new(1), tasks: HashMap::new(), sessions: HashMap::new(), current_session: None, ready_queue: VecDeque::new(), deprioritized_queue: VecDeque::new(), main_thread_queue: VecDeque::new(), delayed: Vec::new(), deprioritized_labels: HashSet::new(), block_tick_range: 0..=1000, parker, unparker, parking_allowed: AtomicBool::new(false), execution_history: Vec::new(), fuzz_inputs, }; Arc::new(Self { state: Arc::new(Mutex::new(state)) }) }
/// Convenience helper: Create a seeded scheduler (randomization enabled by default).
pub fn from_seed(seed: u64) -> Arc<Self> {
Self::new(SchedulerConfig::seeded(seed))
}
/// Convenience helper: Create a scheduler driven by fuzzed inputs.
/// Use for fuzzing with Bolero.
pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
}
// Test-specific methods (direct on impl, no trait)
pub fn assign_task_id(&self) -> TaskId {
TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
}
pub fn deprioritize(&self, label: TaskLabel) {
let mut state = self.state.lock().unwrap();
state.deprioritized_labels.insert(label);
}
pub fn is_task_running(&self, task_id: TaskId) -> bool {
let state = self.state.lock().unwrap();
state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
}
pub fn tick(&self, background_only: bool) -> bool {
self.tick_internal(background_only)
}
fn tick_internal(&self, background_only: bool) -> bool {
// Process delays first (drop lock before polling)
{
let mut state = self.state.lock().unwrap();
state.delayed.retain(|&(time, task_id)| {
if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
state.ready_queue.push_back(task_id);
false
} else { true }
});
} // Lock dropped here
// Select and poll task without lock held
let task_to_poll = {
let mut state = self.state.lock().unwrap();
let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
if !background_only { queues.insert(0, &mut state.main_thread_queue); }
let mut available: Vec<usize> = queues.iter().enumerate()
.filter(|&(_, q)| !q.is_empty())
.map(|(i, _)| i)
.collect();
if available.is_empty() { return false; }
if state.randomize_order { available.shuffle(&mut state.rng); }
let queue_ix = available[0];
let task_id = queues[queue_ix].pop_front().unwrap();
Some(task_id)
}; // Lock dropped here
// Poll the task's future outside the lock
let poll_result = {
let mut state = self.state.lock().unwrap();
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
task.state = TaskState::Running;
if let Some(fut) = task.future.as_mut() {
if let Some(waker) = task.waker.as_ref() {
let mut context = Context::from_waker(waker);
fut.as_mut().poll(&mut context)
} else {
Poll::Pending
}
} else {
Poll::Pending
}
} else {
Poll::Pending
}
}; // Lock dropped here
// Update task state after polling
if poll_result.is_ready() {
let mut state = self.state.lock().unwrap();
if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
task.state = TaskState::Completed;
state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
}
}
true
}
pub fn advance_clock(&self, duration: Duration) {
let mut state = self.state.lock().unwrap();
state.current_time += duration;
}
pub fn run_until_parked(&self) {
while self.tick(false) {}
}
// Cloud session methods
pub fn create_session(&self) -> SessionId {
let mut state = self.state.lock().unwrap();
let id = SessionId(state.sessions.len());
state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
id
}
pub fn set_current_session(&self, session_id: Option<SessionId>) {
let mut state = self.state.lock().unwrap();
state.current_session = session_id;
}
pub fn get_current_session(&self) -> Option<SessionId> {
self.state.lock().unwrap().current_session
}
pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id) {
session.spawned_tasks.insert(task_id);
}
}
pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
let mut state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&session_id) {
session.wait_until_tasks.insert(task_id);
}
}
pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
let state = self.state.lock().unwrap();
if let Some(session) = state.sessions.get(&session_id) {
let dangling: Vec<_> = session.spawned_tasks.iter()
.filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
.filter(|&&tid| !session.wait_until_tasks.contains(&tid))
.cloned()
.collect();
if !dangling.is_empty() {
return Err(anyhow!("{} dangling tasks", dangling.len()));
}
}
Ok(())
}
// Other test methods (e.g., GPUI block simulation)
pub fn gen_block_on_ticks(&self) -> usize {
let state = self.state.lock().unwrap();
state.rng.gen_range(state.block_on_ticks_range.clone())
}
}
## GPUI Usage Example
GPUI wraps the TestScheduler to maintain its existing API:
```rust
pub struct BackgroundExecutor {
scheduler: Arc<dyn Scheduler>, // TestScheduler implements Scheduler
}
impl BackgroundExecutor {
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static
) -> Task<R> {
// Direct downcast for GPUI test features
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.deprioritize(label); // Use test method directly
}
self.scheduler.spawn(future)
}
pub fn deprioritize(&self, label: TaskLabel) {
if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
test_sched.deprioritize(label);
}
}
}
Cloud Usage Example
Cloud wraps for session coordination:
impl SimulatedExecutionContext {
pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
let task = self.scheduler.spawn(async move { future.await })?;
// Direct use of TestScheduler methods
let scheduler = self.scheduler.as_any().downcast_ref::<TestScheduler>().unwrap();
if let Some(session_id) = scheduler.get_current_session() {
scheduler.track_task_for_session(task.id(), session_id);
scheduler.add_wait_until_task(session_id, task.id());
}
Ok(())
}
}
Key Dependencies and Assumptions
- parking_lot: For
Mutex(not RwLock) andParker/Unparker. - async_task: For Runnable wrapping.
- rand_chacha: For seeded RNG.
- futures: For channels.
- chrono: For time ranges (optional).
- anyhow: For errors.
The scheduler assumes no dyn Any is implemented on Scheduler; add fn as_any(&self) -> &dyn std::any::Any; if needed for downcasting.
This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct.