2025-08-31_15-47-17_test-scheduler-design.md

  1# TestScheduler Design Details
  2
  3This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc<dyn Scheduler>, with ForegroundExecutor using PhantomData<Rc<()>> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.**
  4
  5## Overview
  6
  7The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
  8
  9- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
 10- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.**
 11- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
 12
 13There is **no separate TestScheduler trait**β€”all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc<dyn Scheduler>, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.**
 14
 15## Design Principles
 16
 17- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
 18- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
 19- **Determinism**: Always seeded with configurable randomization.
 20- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
 21- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor).
 22- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation.
 23
 24## Complete TestScheduler Implementation
 25
 26```rust
 27use std::collections::{HashMap, HashSet, VecDeque};
 28use std::future::Future;
 29use std::ops::RangeInclusive;
 30use std::pin::Pin;
 31use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 32use std::sync::{Arc, Mutex};
 33use std::task::{Context, Poll, Waker};
 34use std::thread;
 35use std::time::{Duration, Instant};
 36
 37use anyhow::{anyhow, Result};
 38use async_task::Runnable;
 39use chrono::{DateTime, Utc};
 40use futures::channel::oneshot;
 41use parking_lot::{Mutex as ParkingMutex, MutexGuard};
 42use rand::prelude::*;
 43use rand_chacha::ChaCha8Rng;
 44
 45// Core types (shared with main crate)
 46#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 47pub struct TaskId(usize);
 48
 49#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 50pub struct TaskLabel(usize);
 51
 52#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 53pub struct SessionId(usize);
 54
 55pub enum ThreadTarget { Main, Background }
 56
 57pub enum VariationSource {
 58    /// Seeded randomness for reproducible probabilistic scheduling.
 59    Seeded(u64),
 60    /// Fuzzed inputs for deterministic exploration of variations.
 61    Fuzzed(SchedulerFuzzInput),
 62}
 63
 64#[derive(Clone, Debug)]
 65pub struct SchedulerConfig {
 66    /// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
 67    pub randomize_order: bool,
 68    /// Max steps before panic (for deadlock detection).
 69    pub max_steps: usize,
 70    /// Whether to log operations (for debugging).
 71    pub log_operations: bool,
 72    /// The source of non-deterministic decisions (mutually exclusive modes).
 73    pub variation_source: VariationSource,
 74}
 75
 76impl Default for SchedulerConfig {
 77    fn default() -> Self {
 78        Self {
 79            randomize_order: true,
 80            max_steps: 10000,
 81            log_operations: false,
 82            variation_source: VariationSource::Seeded(0),
 83        }
 84    }
 85}
 86
 87impl SchedulerConfig {
 88    /// Create a seeded config (randomization enabled by default).
 89    pub fn seeded(seed: u64) -> Self {
 90        Self {
 91            variation_source: VariationSource::Seeded(seed),
 92            ..Default::default()
 93        }
 94    }
 95
 96    /// Create a fuzzed config (randomization enabled by default).
 97    pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
 98        Self {
 99            variation_source: VariationSource::Fuzzed(fuzz_inputs),
100            ..Default::default()
101        }
102    }
103
104    /// For isolation: Disable randomization for deterministic testing.
105    pub fn deterministic(seed: u64) -> Self {
106        Self {
107            randomize_order: false,
108            variation_source: VariationSource::Seeded(seed),
109            ..Default::default()
110        }
111    }
112}
113
114#[derive(Arbitrary, Debug, Clone)]
115pub struct SchedulerFuzzInput {
116    queue_selections: Vec<u8>,
117    task_indices: Vec<u32>,
118    delay_bools: Vec<bool>,
119    block_ticks: Vec<usize>,
120}
121
122pub struct Task<T> {
123    id: TaskId,
124    rx: oneshot::Receiver<T>,
125    scheduler: Arc<TestScheduler>,
126}
127
128impl<T> Future for Task<T> {
129    type Output = T;
130    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
131        match self.rx.try_recv() {
132            Ok(Some(val)) => Poll::Ready(val),
133            _ => { cx.waker().wake_by_ref(); Poll::Pending }
134        }
135    }
136}
137
138// Internal state
139struct TaskInfo {
140    label: Option<TaskLabel>,
141    session_id: Option<SessionId>,
142    waker: Option<Waker>,
143    future: Option<Pin<Box<dyn Future<Output = ()>>>>,
144    state: TaskState,
145    delay_range: Option<(Instant, Instant)>,
146}
147
148#[derive(PartialEq)]
149enum TaskState { Pending, Running, Completed }
150
151struct WorkerSession {
152    spawned_tasks: HashSet<TaskId>,
153    wait_until_tasks: HashSet<TaskId>,
154}
155
156struct SchedulerState {
157    rng: ChaCha8Rng,
158    randomize_order: bool,
159    max_steps: usize,
160    current_time: Instant,
161    next_task_id: AtomicUsize,
162    tasks: HashMap<TaskId, TaskInfo>,
163    sessions: HashMap<SessionId, WorkerSession>,
164    current_session: Option<SessionId>,
165    ready_queue: VecDeque<TaskId>,
166    deprioritized_queue: VecDeque<TaskId>,
167    main_thread_queue: VecDeque<TaskId>,
168    delayed: Vec<(Instant, TaskId)>,
169    deprioritized_labels: HashSet<TaskLabel>,
170    block_tick_range: std::ops::RangeInclusive<usize>,
171    parker: parking_lot::Parker,
172    unparker: parking_lot::Unparker,
173    parking_allowed: AtomicBool,
174    execution_history: Vec<String>,
175    fuzz_inputs: Option<FuzzedSchedulerInputs>,
176    creation_thread_id: thread::ThreadId,  // Added for thread safety checks
177}
178
179// Concrete implementation
180pub struct TestScheduler {
181    state: Arc<Mutex<SchedulerState>>,
182}
183
184impl TestScheduler {
185    /// Primary constructor: Create a scheduler from full configuration.
186    pub fn new(config: SchedulerConfig) -> Arc<Self> {
187        let (parker, unparker) = parking_lot::pair();
188        let (rng, fuzz_inputs) = match config.variation_source {
189            VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
190            VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
191        };
192        let state = SchedulerState {
193            rng,
194            randomize_order: config.randomize_order,
195            max_steps: config.max_steps,
196            current_time: Instant::now(),
197            next_task_id: AtomicUsize::new(1),
198            tasks: HashMap::new(),
199            sessions: HashMap::new(),
200            current_session: None,
201            ready_queue: VecDeque::new(),
202            deprioritized_queue: VecDeque::new(),
203            main_thread_queue: VecDeque::new(),
204            delayed: Vec::new(),
205            deprioritized_labels: HashSet::new(),
206            block_tick_range: 0..=1000,
207            parker,
208            unparker,
209            parking_allowed: AtomicBool::new(false),
210            execution_history: Vec::new(),
211            fuzz_inputs,
212            creation_thread_id: thread::current().id(),  // Capture creation thread
213        };
214        Arc::new(Self { state: Arc::new(Mutex::new(state)) })
215    }
216
217    // Added for ForegroundExecutor thread checks
218    pub fn assert_main_thread(&self) {
219        let state = self.state.lock().unwrap();
220        if thread::current().id() != state.creation_thread_id {
221            panic!("ForegroundExecutor method called from wrong thread");
222        }
223    }
224
225    /// Convenience helper: Create a seeded scheduler (randomization enabled by default).
226    pub fn from_seed(seed: u64) -> Arc<Self> {
227        Self::new(SchedulerConfig::seeded(seed))
228    }
229
230    /// Convenience helper: Create a scheduler driven by fuzzed inputs.
231    /// Use for fuzzing with Bolero.
232    pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
233        Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
234    }
235
236    // Test-specific methods (direct on impl, no trait)
237    pub fn assign_task_id(&self) -> TaskId {
238        TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
239    }
240
241    pub fn deprioritize(&self, label: TaskLabel) {
242        let mut state = self.state.lock().unwrap();
243        state.deprioritized_labels.insert(label);
244    }
245
246    pub fn is_task_running(&self, task_id: TaskId) -> bool {
247        let state = self.state.lock().unwrap();
248        state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
249    }
250
251    pub fn tick(&self, background_only: bool) -> bool {
252        self.tick_internal(background_only)
253    }
254
255    fn tick_internal(&self, background_only: bool) -> bool {
256        // Process delays first (drop lock before polling)
257        {
258            let mut state = self.state.lock().unwrap();
259            state.delayed.retain(|&(time, task_id)| {
260                if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
261                    state.ready_queue.push_back(task_id);
262                    false
263                } else { true }
264            });
265        } // Lock dropped here
266
267        // Select and poll task without lock held
268        let task_to_poll = {
269            let mut state = self.state.lock().unwrap();
270            let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
271            if !background_only { queues.insert(0, &mut state.main_thread_queue); }
272
273            let mut available: Vec<usize> = queues.iter().enumerate()
274                .filter(|&(_, q)| !q.is_empty())
275                .map(|(i, _)| i)
276                .collect();
277
278            if available.is_empty() { return false; }
279
280            if state.randomize_order { available.shuffle(&mut state.rng); }
281
282            let queue_ix = available[0];
283            let task_id = queues[queue_ix].pop_front().unwrap();
284            Some(task_id)
285        }; // Lock dropped here
286
287        // Poll the task's future outside the lock
288        let poll_result = {
289            let mut state = self.state.lock().unwrap();
290            if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
291                task.state = TaskState::Running;
292                if let Some(fut) = task.future.as_mut() {
293                    if let Some(waker) = task.waker.as_ref() {
294                        let mut context = Context::from_waker(waker);
295                        fut.as_mut().poll(&mut context)
296                    } else {
297                        Poll::Pending
298                    }
299                } else {
300                    Poll::Pending
301                }
302            } else {
303                Poll::Pending
304            }
305        }; // Lock dropped here
306
307        // Update task state after polling
308        if poll_result.is_ready() {
309            let mut state = self.state.lock().unwrap();
310            if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
311                task.state = TaskState::Completed;
312                state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
313            }
314        }
315
316        true
317    }
318
319    pub fn advance_clock(&self, duration: Duration) {
320        let mut state = self.state.lock().unwrap();
321        state.current_time += duration;
322    }
323
324    pub fn run_until_parked(&self) {
325        while self.tick(false) {}
326    }
327
328    // Cloud session methods
329    pub fn create_session(&self) -> SessionId {
330        let mut state = self.state.lock().unwrap();
331        let id = SessionId(state.sessions.len());
332        state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
333        id
334    }
335
336    pub fn set_current_session(&self, session_id: Option<SessionId>) {
337        let mut state = self.state.lock().unwrap();
338        state.current_session = session_id;
339    }
340
341    pub fn get_current_session(&self) -> Option<SessionId> {
342        self.state.lock().unwrap().current_session
343    }
344
345    pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
346        let mut state = self.state.lock().unwrap();
347        if let Some(session) = state.sessions.get_mut(&session_id) {
348            session.spawned_tasks.insert(task_id);
349        }
350    }
351
352    pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
353        let mut state = self.state.lock().unwrap();
354        if let Some(session) = state.sessions.get_mut(&session_id) {
355            session.wait_until_tasks.insert(task_id);
356        }
357    }
358
359    pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
360        let state = self.state.lock().unwrap();
361        if let Some(session) = state.sessions.get(&session_id) {
362            let dangling: Vec<_> = session.spawned_tasks.iter()
363                .filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
364                .filter(|&&tid| !session.wait_until_tasks.contains(&tid))
365                .cloned()
366                .collect();
367            if !dangling.is_empty() {
368                return Err(anyhow!("{} dangling tasks", dangling.len()));
369            }
370        }
371        Ok(())
372    }
373
374    // Other test methods (e.g., GPUI block simulation)
375    pub fn gen_block_on_ticks(&self) -> usize {
376        let state = self.state.lock().unwrap();
377        state.rng.gen_range(state.block_on_ticks_range.clone())
378    }
379}
380```
381
382## GPUI Usage Example
383
384GPUI wraps the TestScheduler using Executor and ForegroundExecutor:
385
386```rust
387use std::marker::PhantomData;
388use std::rc::Rc;
389
390// Generic Executor for background tasks (Send futures)
391pub struct Executor {
392    scheduler: Arc<dyn Scheduler>,
393}
394
395impl Executor {
396    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
397        Self { scheduler }
398    }
399
400    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
401    where R: Send + 'static {
402        // Delegate to scheduler via downcast for test methods if applicable
403        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
404            // Use test_sched methods
405        }
406        self.scheduler.spawn(future)
407    }
408
409    pub fn spawn_labeled<R>(
410        &self,
411        label: TaskLabel,
412        future: impl Future<Output = R> + Send + 'static
413    ) -> Task<R>
414    where R: Send + 'static {
415        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
416            test_sched.deprioritize(label);
417        }
418        self.scheduler.spawn_labeled(label, future)
419    }
420
421    pub fn deprioritize(&self, label: TaskLabel) {
422        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
423            test_sched.deprioritize(label);
424        }
425    }
426
427    pub fn timer(&self, duration: Duration) -> Task<()> {
428        self.scheduler.timer(duration)
429    }
430
431    pub fn tick(&self) -> Option<bool> {
432        self.scheduler.as_any().downcast_ref::<TestScheduler>().map(|ts| ts.tick(false))
433    }
434}
435
436// ForegroundExecutor for main-thread tasks (!Send futures, thread checks)
437pub struct ForegroundExecutor {
438    executor: Executor,
439    _phantom: PhantomData<Rc<()>>,  // Enforces !Send
440}
441
442impl !Send for ForegroundExecutor {}  // Explicitly !Send
443
444impl ForegroundExecutor {
445    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
446        let executor = Executor::new(scheduler);
447        // Check thread immediately via scheduler
448        if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
449            test_sched.assert_main_thread();
450        } else {
451            // Production: assume created on main thread
452        }
453        Ok(Self { executor, _phantom: PhantomData })
454    }
455
456    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
457    where R: 'static {
458        // Assert thread before delegating
459        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
460            test_sched.assert_main_thread();
461        }
462        self.executor.scheduler.spawn_foreground(future)
463    }
464
465    pub fn timer(&self, duration: Duration) -> Task<()> {
466        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
467            test_sched.assert_main_thread();
468        }
469        self.executor.scheduler.timer(duration)
470    }
471
472    // Other methods mirror Executor but with thread checks
473}
474```
475
476## Cloud Usage Example
477
478Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required):
479
480```rust
481impl SimulatedExecutionContext {
482    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
483        let fg_executor = ForegroundExecutor::new(scheduler)?;  // Use ForegroundExecutor for thread safety and simplicity
484        Self {
485            executor: fg_executor,
486            session_counter: AtomicUsize::new(0),
487            sessions: Mutex::new(HashMap::new()),
488            current_session: Mutex::new(None),
489        }
490    }
491
492    pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
493        let task = self.executor.spawn(async move { future.await })?;
494        
495        // Direct use of TestScheduler methods via downcast from executor
496        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
497            if let Some(session_id) = test_sched.get_current_session() {
498                test_sched.track_task_for_session(task.id(), session_id);
499                test_sched.add_wait_until_task(session_id, task.id());
500            }
501        }
502        
503        Ok(())
504    }
505}
506```
507
508## Key Dependencies and Assumptions
509
510- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
511- **async_task**: For Runnable wrapping.
512- **rand_chacha**: For seeded RNG.
513- **futures**: For channels.
514- **chrono**: For time ranges (optional).
515- **anyhow**: For errors.
516- **std::thread**: For thread ID comparison.
517
518The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
519
520This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety.