1# TestScheduler Design Details
2
3This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc<dyn Scheduler>, with ForegroundExecutor using PhantomData<Rc<()>> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.**
4
5## Overview
6
7The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
8
9- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
10- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.**
11- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
12
13There is **no separate TestScheduler trait**βall test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc<dyn Scheduler>, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.**
14
15## Design Principles
16
17- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
18- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
19- **Determinism**: Always seeded with configurable randomization.
20- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
21- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor).
22- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation.
23
24## Complete TestScheduler Implementation
25
26```rust
27use std::collections::{HashMap, HashSet, VecDeque};
28use std::future::Future;
29use std::ops::RangeInclusive;
30use std::pin::Pin;
31use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
32use std::sync::{Arc, Mutex};
33use std::task::{Context, Poll, Waker};
34use std::thread;
35use std::time::{Duration, Instant};
36
37use anyhow::{anyhow, Result};
38use async_task::Runnable;
39use chrono::{DateTime, Utc};
40use futures::channel::oneshot;
41use parking_lot::{Mutex as ParkingMutex, MutexGuard};
42use rand::prelude::*;
43use rand_chacha::ChaCha8Rng;
44
45// Core types (shared with main crate)
46#[derive(Clone, Copy, PartialEq, Eq, Hash)]
47pub struct TaskId(usize);
48
49#[derive(Clone, Copy, PartialEq, Eq, Hash)]
50pub struct TaskLabel(usize);
51
52#[derive(Clone, Copy, PartialEq, Eq, Hash)]
53pub struct SessionId(usize);
54
55pub enum ThreadTarget { Main, Background }
56
57pub enum VariationSource {
58 /// Seeded randomness for reproducible probabilistic scheduling.
59 Seeded(u64),
60 /// Fuzzed inputs for deterministic exploration of variations.
61 Fuzzed(SchedulerFuzzInput),
62}
63
64#[derive(Clone, Debug)]
65pub struct SchedulerConfig {
66 /// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
67 pub randomize_order: bool,
68 /// Max steps before panic (for deadlock detection).
69 pub max_steps: usize,
70 /// Whether to log operations (for debugging).
71 pub log_operations: bool,
72 /// The source of non-deterministic decisions (mutually exclusive modes).
73 pub variation_source: VariationSource,
74}
75
76impl Default for SchedulerConfig {
77 fn default() -> Self {
78 Self {
79 randomize_order: true,
80 max_steps: 10000,
81 log_operations: false,
82 variation_source: VariationSource::Seeded(0),
83 }
84 }
85}
86
87impl SchedulerConfig {
88 /// Create a seeded config (randomization enabled by default).
89 pub fn seeded(seed: u64) -> Self {
90 Self {
91 variation_source: VariationSource::Seeded(seed),
92 ..Default::default()
93 }
94 }
95
96 /// Create a fuzzed config (randomization enabled by default).
97 pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
98 Self {
99 variation_source: VariationSource::Fuzzed(fuzz_inputs),
100 ..Default::default()
101 }
102 }
103
104 /// For isolation: Disable randomization for deterministic testing.
105 pub fn deterministic(seed: u64) -> Self {
106 Self {
107 randomize_order: false,
108 variation_source: VariationSource::Seeded(seed),
109 ..Default::default()
110 }
111 }
112}
113
114#[derive(Arbitrary, Debug, Clone)]
115pub struct SchedulerFuzzInput {
116 queue_selections: Vec<u8>,
117 task_indices: Vec<u32>,
118 delay_bools: Vec<bool>,
119 block_ticks: Vec<usize>,
120}
121
122pub struct Task<T> {
123 id: TaskId,
124 rx: oneshot::Receiver<T>,
125 scheduler: Arc<TestScheduler>,
126}
127
128impl<T> Future for Task<T> {
129 type Output = T;
130 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
131 match self.rx.try_recv() {
132 Ok(Some(val)) => Poll::Ready(val),
133 _ => { cx.waker().wake_by_ref(); Poll::Pending }
134 }
135 }
136}
137
138// Internal state
139struct TaskInfo {
140 label: Option<TaskLabel>,
141 session_id: Option<SessionId>,
142 waker: Option<Waker>,
143 future: Option<Pin<Box<dyn Future<Output = ()>>>>,
144 state: TaskState,
145 delay_range: Option<(Instant, Instant)>,
146}
147
148#[derive(PartialEq)]
149enum TaskState { Pending, Running, Completed }
150
151struct WorkerSession {
152 spawned_tasks: HashSet<TaskId>,
153 wait_until_tasks: HashSet<TaskId>,
154}
155
156struct SchedulerState {
157 rng: ChaCha8Rng,
158 randomize_order: bool,
159 max_steps: usize,
160 current_time: Instant,
161 next_task_id: AtomicUsize,
162 tasks: HashMap<TaskId, TaskInfo>,
163 sessions: HashMap<SessionId, WorkerSession>,
164 current_session: Option<SessionId>,
165 ready_queue: VecDeque<TaskId>,
166 deprioritized_queue: VecDeque<TaskId>,
167 main_thread_queue: VecDeque<TaskId>,
168 delayed: Vec<(Instant, TaskId)>,
169 deprioritized_labels: HashSet<TaskLabel>,
170 block_tick_range: std::ops::RangeInclusive<usize>,
171 parker: parking_lot::Parker,
172 unparker: parking_lot::Unparker,
173 parking_allowed: AtomicBool,
174 execution_history: Vec<String>,
175 fuzz_inputs: Option<FuzzedSchedulerInputs>,
176 creation_thread_id: thread::ThreadId, // Added for thread safety checks
177}
178
179// Concrete implementation
180pub struct TestScheduler {
181 state: Arc<Mutex<SchedulerState>>,
182}
183
184impl TestScheduler {
185 /// Primary constructor: Create a scheduler from full configuration.
186 pub fn new(config: SchedulerConfig) -> Arc<Self> {
187 let (parker, unparker) = parking_lot::pair();
188 let (rng, fuzz_inputs) = match config.variation_source {
189 VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
190 VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
191 };
192 let state = SchedulerState {
193 rng,
194 randomize_order: config.randomize_order,
195 max_steps: config.max_steps,
196 current_time: Instant::now(),
197 next_task_id: AtomicUsize::new(1),
198 tasks: HashMap::new(),
199 sessions: HashMap::new(),
200 current_session: None,
201 ready_queue: VecDeque::new(),
202 deprioritized_queue: VecDeque::new(),
203 main_thread_queue: VecDeque::new(),
204 delayed: Vec::new(),
205 deprioritized_labels: HashSet::new(),
206 block_tick_range: 0..=1000,
207 parker,
208 unparker,
209 parking_allowed: AtomicBool::new(false),
210 execution_history: Vec::new(),
211 fuzz_inputs,
212 creation_thread_id: thread::current().id(), // Capture creation thread
213 };
214 Arc::new(Self { state: Arc::new(Mutex::new(state)) })
215 }
216
217 // Added for ForegroundExecutor thread checks
218 pub fn assert_main_thread(&self) {
219 let state = self.state.lock().unwrap();
220 if thread::current().id() != state.creation_thread_id {
221 panic!("ForegroundExecutor method called from wrong thread");
222 }
223 }
224
225 /// Convenience helper: Create a seeded scheduler (randomization enabled by default).
226 pub fn from_seed(seed: u64) -> Arc<Self> {
227 Self::new(SchedulerConfig::seeded(seed))
228 }
229
230 /// Convenience helper: Create a scheduler driven by fuzzed inputs.
231 /// Use for fuzzing with Bolero.
232 pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
233 Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
234 }
235
236 // Test-specific methods (direct on impl, no trait)
237 pub fn assign_task_id(&self) -> TaskId {
238 TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
239 }
240
241 pub fn deprioritize(&self, label: TaskLabel) {
242 let mut state = self.state.lock().unwrap();
243 state.deprioritized_labels.insert(label);
244 }
245
246 pub fn is_task_running(&self, task_id: TaskId) -> bool {
247 let state = self.state.lock().unwrap();
248 state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
249 }
250
251 pub fn tick(&self, background_only: bool) -> bool {
252 self.tick_internal(background_only)
253 }
254
255 fn tick_internal(&self, background_only: bool) -> bool {
256 // Process delays first (drop lock before polling)
257 {
258 let mut state = self.state.lock().unwrap();
259 state.delayed.retain(|&(time, task_id)| {
260 if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
261 state.ready_queue.push_back(task_id);
262 false
263 } else { true }
264 });
265 } // Lock dropped here
266
267 // Select and poll task without lock held
268 let task_to_poll = {
269 let mut state = self.state.lock().unwrap();
270 let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
271 if !background_only { queues.insert(0, &mut state.main_thread_queue); }
272
273 let mut available: Vec<usize> = queues.iter().enumerate()
274 .filter(|&(_, q)| !q.is_empty())
275 .map(|(i, _)| i)
276 .collect();
277
278 if available.is_empty() { return false; }
279
280 if state.randomize_order { available.shuffle(&mut state.rng); }
281
282 let queue_ix = available[0];
283 let task_id = queues[queue_ix].pop_front().unwrap();
284 Some(task_id)
285 }; // Lock dropped here
286
287 // Poll the task's future outside the lock
288 let poll_result = {
289 let mut state = self.state.lock().unwrap();
290 if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
291 task.state = TaskState::Running;
292 if let Some(fut) = task.future.as_mut() {
293 if let Some(waker) = task.waker.as_ref() {
294 let mut context = Context::from_waker(waker);
295 fut.as_mut().poll(&mut context)
296 } else {
297 Poll::Pending
298 }
299 } else {
300 Poll::Pending
301 }
302 } else {
303 Poll::Pending
304 }
305 }; // Lock dropped here
306
307 // Update task state after polling
308 if poll_result.is_ready() {
309 let mut state = self.state.lock().unwrap();
310 if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
311 task.state = TaskState::Completed;
312 state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
313 }
314 }
315
316 true
317 }
318
319 pub fn advance_clock(&self, duration: Duration) {
320 let mut state = self.state.lock().unwrap();
321 state.current_time += duration;
322 }
323
324 pub fn run_until_parked(&self) {
325 while self.tick(false) {}
326 }
327
328 // Cloud session methods
329 pub fn create_session(&self) -> SessionId {
330 let mut state = self.state.lock().unwrap();
331 let id = SessionId(state.sessions.len());
332 state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
333 id
334 }
335
336 pub fn set_current_session(&self, session_id: Option<SessionId>) {
337 let mut state = self.state.lock().unwrap();
338 state.current_session = session_id;
339 }
340
341 pub fn get_current_session(&self) -> Option<SessionId> {
342 self.state.lock().unwrap().current_session
343 }
344
345 pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
346 let mut state = self.state.lock().unwrap();
347 if let Some(session) = state.sessions.get_mut(&session_id) {
348 session.spawned_tasks.insert(task_id);
349 }
350 }
351
352 pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
353 let mut state = self.state.lock().unwrap();
354 if let Some(session) = state.sessions.get_mut(&session_id) {
355 session.wait_until_tasks.insert(task_id);
356 }
357 }
358
359 pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
360 let state = self.state.lock().unwrap();
361 if let Some(session) = state.sessions.get(&session_id) {
362 let dangling: Vec<_> = session.spawned_tasks.iter()
363 .filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
364 .filter(|&&tid| !session.wait_until_tasks.contains(&tid))
365 .cloned()
366 .collect();
367 if !dangling.is_empty() {
368 return Err(anyhow!("{} dangling tasks", dangling.len()));
369 }
370 }
371 Ok(())
372 }
373
374 // Other test methods (e.g., GPUI block simulation)
375 pub fn gen_block_on_ticks(&self) -> usize {
376 let state = self.state.lock().unwrap();
377 state.rng.gen_range(state.block_on_ticks_range.clone())
378 }
379}
380```
381
382## GPUI Usage Example
383
384GPUI wraps the TestScheduler using Executor and ForegroundExecutor:
385
386```rust
387use std::marker::PhantomData;
388use std::rc::Rc;
389
390// Generic Executor for background tasks (Send futures)
391pub struct Executor {
392 scheduler: Arc<dyn Scheduler>,
393}
394
395impl Executor {
396 pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
397 Self { scheduler }
398 }
399
400 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
401 where R: Send + 'static {
402 // Delegate to scheduler via downcast for test methods if applicable
403 if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
404 // Use test_sched methods
405 }
406 self.scheduler.spawn(future)
407 }
408
409 pub fn spawn_labeled<R>(
410 &self,
411 label: TaskLabel,
412 future: impl Future<Output = R> + Send + 'static
413 ) -> Task<R>
414 where R: Send + 'static {
415 if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
416 test_sched.deprioritize(label);
417 }
418 self.scheduler.spawn_labeled(label, future)
419 }
420
421 pub fn deprioritize(&self, label: TaskLabel) {
422 if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
423 test_sched.deprioritize(label);
424 }
425 }
426
427 pub fn timer(&self, duration: Duration) -> Task<()> {
428 self.scheduler.timer(duration)
429 }
430
431 pub fn tick(&self) -> Option<bool> {
432 self.scheduler.as_any().downcast_ref::<TestScheduler>().map(|ts| ts.tick(false))
433 }
434}
435
436// ForegroundExecutor for main-thread tasks (!Send futures, thread checks)
437pub struct ForegroundExecutor {
438 executor: Executor,
439 _phantom: PhantomData<Rc<()>>, // Enforces !Send
440}
441
442impl !Send for ForegroundExecutor {} // Explicitly !Send
443
444impl ForegroundExecutor {
445 pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
446 let executor = Executor::new(scheduler);
447 // Check thread immediately via scheduler
448 if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
449 test_sched.assert_main_thread();
450 } else {
451 // Production: assume created on main thread
452 }
453 Ok(Self { executor, _phantom: PhantomData })
454 }
455
456 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
457 where R: 'static {
458 // Assert thread before delegating
459 if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
460 test_sched.assert_main_thread();
461 }
462 self.executor.scheduler.spawn_foreground(future)
463 }
464
465 pub fn timer(&self, duration: Duration) -> Task<()> {
466 if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
467 test_sched.assert_main_thread();
468 }
469 self.executor.scheduler.timer(duration)
470 }
471
472 // Other methods mirror Executor but with thread checks
473}
474```
475
476## Cloud Usage Example
477
478Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required):
479
480```rust
481impl SimulatedExecutionContext {
482 pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
483 let fg_executor = ForegroundExecutor::new(scheduler)?; // Use ForegroundExecutor for thread safety and simplicity
484 Self {
485 executor: fg_executor,
486 session_counter: AtomicUsize::new(0),
487 sessions: Mutex::new(HashMap::new()),
488 current_session: Mutex::new(None),
489 }
490 }
491
492 pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
493 let task = self.executor.spawn(async move { future.await })?;
494
495 // Direct use of TestScheduler methods via downcast from executor
496 if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
497 if let Some(session_id) = test_sched.get_current_session() {
498 test_sched.track_task_for_session(task.id(), session_id);
499 test_sched.add_wait_until_task(session_id, task.id());
500 }
501 }
502
503 Ok(())
504 }
505}
506```
507
508## Key Dependencies and Assumptions
509
510- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
511- **async_task**: For Runnable wrapping.
512- **rand_chacha**: For seeded RNG.
513- **futures**: For channels.
514- **chrono**: For time ranges (optional).
515- **anyhow**: For errors.
516- **std::thread**: For thread ID comparison.
517
518The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
519
520This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety.