1<file_path>
2zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md
3</file_path>
4
5<edit_description>
6Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety
7</edit_description>
8
9# TestScheduler Design Details
10
11This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios.
12
13## Overview
14
15The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
16
17- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
18- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan).
19- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
20
21There is **no separate TestScheduler trait**βall test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context.
22
23## Design Principles
24
25- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
26- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
27- **Determinism**: Always seeded with configurable randomization.
28- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
29- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor).
30
31## Complete TestScheduler Implementation
32
33```rust
34use std::collections::{HashMap, HashSet, VecDeque};
35use std::future::Future;
36use std::ops::RangeInclusive;
37use std::pin::Pin;
38use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
39use std::sync::{Arc, Mutex};
40use std::task::{Context, Poll, Waker};
41use std::time::{Duration, Instant};
42
43use anyhow::{anyhow, Result};
44use async_task::Runnable;
45use chrono::{DateTime, Utc};
46use futures::channel::oneshot;
47use parking_lot::{Mutex as ParkingMutex, MutexGuard};
48use rand::prelude::*;
49use rand_chacha::ChaCha8Rng;
50
51// Core types (shared with main crate)
52#[derive(Clone, Copy, PartialEq, Eq, Hash)]
53pub struct TaskId(usize);
54
55#[derive(Clone, Copy, PartialEq, Eq, Hash)]
56pub struct TaskLabel(usize);
57
58#[derive(Clone, Copy, PartialEq, Eq, Hash)]
59pub struct SessionId(usize);
60
61pub enum ThreadTarget { Main, Background }
62
63pub enum VariationSource {
64 /// Seeded randomness for reproducible probabilistic scheduling.
65 Seeded(u64),
66 /// Fuzzed inputs for deterministic exploration of variations.
67 Fuzzed(SchedulerFuzzInput),
68}
69
70#[derive(Clone, Debug)]
71pub struct SchedulerConfig {
72 /// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
73 pub randomize_order: bool,
74 /// Max steps before panic (for deadlock detection).
75 pub max_steps: usize,
76 /// Whether to log operations (for debugging).
77 pub log_operations: bool,
78 /// The source of non-deterministic decisions (mutually exclusive modes).
79 pub variation_source: VariationSource,
80}
81
82impl Default for SchedulerConfig {
83 fn default() -> Self {
84 Self {
85 randomize_order: true,
86 max_steps: 10000,
87 log_operations: false,
88 variation_source: VariationSource::Seeded(0),
89 }
90 }
91}
92
93impl SchedulerConfig {
94 /// Create a seeded config (randomization enabled by default).
95 pub fn seeded(seed: u64) -> Self {
96 Self {
97 variation_source: VariationSource::Seeded(seed),
98 ..Default::default()
99 }
100 }
101
102 /// Create a fuzzed config (randomization enabled by default).
103 pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
104 Self {
105 variation_source: VariationSource::Fuzzed(fuzz_inputs),
106 ..Default::default()
107 }
108 }
109
110 /// For isolation: Disable randomization for deterministic testing.
111 pub fn deterministic(seed: u64) -> Self {
112 Self {
113 randomize_order: false,
114 variation_source: VariationSource::Seeded(seed),
115 ..Default::default()
116 }
117 }
118}
119
120#[derive(Arbitrary, Debug, Clone)]
121pub struct SchedulerFuzzInput {
122 queue_selections: Vec<u8>,
123 task_indices: Vec<u32>,
124 delay_bools: Vec<bool>,
125 block_ticks: Vec<usize>,
126}
127
128pub struct Task<T> {
129 id: TaskId,
130 rx: oneshot::Receiver<T>,
131 scheduler: Arc<TestScheduler>,
132}
133
134impl<T> Future for Task<T> {
135 type Output = T;
136 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137 match self.rx.try_recv() {
138 Ok(Some(val)) => Poll::Ready(val),
139 _ => { cx.waker().wake_by_ref(); Poll::Pending }
140 }
141 }
142}
143
144// Internal state
145struct TaskInfo {
146 label: Option<TaskLabel>,
147 session_id: Option<SessionId>,
148 waker: Option<Waker>,
149 future: Option<Pin<Box<dyn Future<Output = ()>>>>,
150 state: TaskState,
151 delay_range: Option<(Instant, Instant)>,
152}
153
154#[derive(PartialEq)]
155enum TaskState { Pending, Running, Completed }
156
157struct WorkerSession {
158 spawned_tasks: HashSet<TaskId>,
159 wait_until_tasks: HashSet<TaskId>,
160}
161
162struct SchedulerState {
163 rng: ChaCha8Rng,
164 randomize_order: bool,
165 max_steps: usize,
166 current_time: Instant,
167 next_task_id: AtomicUsize,
168 tasks: HashMap<TaskId, TaskInfo>,
169 sessions: HashMap<SessionId, WorkerSession>,
170 current_session: Option<SessionId>,
171 ready_queue: VecDeque<TaskId>,
172 deprioritized_queue: VecDeque<TaskId>,
173 main_thread_queue: VecDeque<TaskId>,
174 delayed: Vec<(Instant, TaskId)>,
175 deprioritized_labels: HashSet<TaskLabel>,
176 block_tick_range: std::ops::RangeInclusive<usize>,
177 parker: parking_lot::Parker,
178 unparker: parking_lot::Unparker,
179 parking_allowed: AtomicBool,
180 execution_history: Vec<String>,
181 fuzz_inputs: Option<FuzzedSchedulerInputs>,
182}
183
184// Concrete implementation
185pub struct TestScheduler {
186 state: Arc<Mutex<SchedulerState>>,
187}
188```
189
190impl TestScheduler {
191 /// Primary constructor: Create a scheduler from full configuration.
192 pub fn new(config: SchedulerConfig) -> Arc<Self> {
193 let (parker, unparker) = parking_lot::pair();
194 let (rng, fuzz_inputs) = match config.variation_source {
195 VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
196 VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
197 };
198 let state = SchedulerState {
199 rng,
200 randomize_order: config.randomize_order,
201 max_steps: config.max_steps,
202 current_time: Instant::now(),
203 next_task_id: AtomicUsize::new(1),
204 tasks: HashMap::new(),
205 sessions: HashMap::new(),
206 current_session: None,
207 ready_queue: VecDeque::new(),
208 deprioritized_queue: VecDeque::new(),
209 main_thread_queue: VecDeque::new(),
210 delayed: Vec::new(),
211 deprioritized_labels: HashSet::new(),
212 block_tick_range: 0..=1000,
213 parker,
214 unparker,
215 parking_allowed: AtomicBool::new(false),
216 execution_history: Vec::new(),
217 fuzz_inputs,
218 };
219 Arc::new(Self { state: Arc::new(Mutex::new(state)) })
220 }
221
222 /// Convenience helper: Create a seeded scheduler (randomization enabled by default).
223 pub fn from_seed(seed: u64) -> Arc<Self> {
224 Self::new(SchedulerConfig::seeded(seed))
225 }
226
227 /// Convenience helper: Create a scheduler driven by fuzzed inputs.
228 /// Use for fuzzing with Bolero.
229 pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
230 Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
231 }
232
233 // Test-specific methods (direct on impl, no trait)
234 pub fn assign_task_id(&self) -> TaskId {
235 TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
236 }
237
238 pub fn deprioritize(&self, label: TaskLabel) {
239 let mut state = self.state.lock().unwrap();
240 state.deprioritized_labels.insert(label);
241 }
242
243 pub fn is_task_running(&self, task_id: TaskId) -> bool {
244 let state = self.state.lock().unwrap();
245 state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
246 }
247
248 pub fn tick(&self, background_only: bool) -> bool {
249 self.tick_internal(background_only)
250 }
251
252 fn tick_internal(&self, background_only: bool) -> bool {
253 // Process delays first (drop lock before polling)
254 {
255 let mut state = self.state.lock().unwrap();
256 state.delayed.retain(|&(time, task_id)| {
257 if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
258 state.ready_queue.push_back(task_id);
259 false
260 } else { true }
261 });
262 } // Lock dropped here
263
264 // Select and poll task without lock held
265 let task_to_poll = {
266 let mut state = self.state.lock().unwrap();
267 let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
268 if !background_only { queues.insert(0, &mut state.main_thread_queue); }
269
270 let mut available: Vec<usize> = queues.iter().enumerate()
271 .filter(|&(_, q)| !q.is_empty())
272 .map(|(i, _)| i)
273 .collect();
274
275 if available.is_empty() { return false; }
276
277 if state.randomize_order { available.shuffle(&mut state.rng); }
278
279 let queue_ix = available[0];
280 let task_id = queues[queue_ix].pop_front().unwrap();
281 Some(task_id)
282 }; // Lock dropped here
283
284 // Poll the task's future outside the lock
285 let poll_result = {
286 let mut state = self.state.lock().unwrap();
287 if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
288 task.state = TaskState::Running;
289 if let Some(fut) = task.future.as_mut() {
290 if let Some(waker) = task.waker.as_ref() {
291 let mut context = Context::from_waker(waker);
292 fut.as_mut().poll(&mut context)
293 } else {
294 Poll::Pending
295 }
296 } else {
297 Poll::Pending
298 }
299 } else {
300 Poll::Pending
301 }
302 }; // Lock dropped here
303
304 // Update task state after polling
305 if poll_result.is_ready() {
306 let mut state = self.state.lock().unwrap();
307 if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
308 task.state = TaskState::Completed;
309 state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
310 }
311 }
312
313 true
314 }
315
316 pub fn advance_clock(&self, duration: Duration) {
317 let mut state = self.state.lock().unwrap();
318 state.current_time += duration;
319 }
320
321 pub fn run_until_parked(&self) {
322 while self.tick(false) {}
323 }
324
325 // Cloud session methods
326 pub fn create_session(&self) -> SessionId {
327 let mut state = self.state.lock().unwrap();
328 let id = SessionId(state.sessions.len());
329 state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
330 id
331 }
332
333 pub fn set_current_session(&self, session_id: Option<SessionId>) {
334 let mut state = self.state.lock().unwrap();
335 state.current_session = session_id;
336 }
337
338 pub fn get_current_session(&self) -> Option<SessionId> {
339 self.state.lock().unwrap().current_session
340 }
341
342 pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
343 let mut state = self.state.lock().unwrap();
344 if let Some(session) = state.sessions.get_mut(&session_id) {
345 session.spawned_tasks.insert(task_id);
346 }
347 }
348
349 pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
350 let mut state = self.state.lock().unwrap();
351 if let Some(session) = state.sessions.get_mut(&session_id) {
352 session.wait_until_tasks.insert(task_id);
353 }
354 }
355
356 pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
357 let state = self.state.lock().unwrap();
358 if let Some(session) = state.sessions.get(&session_id) {
359 let dangling: Vec<_> = session.spawned_tasks.iter()
360 .filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
361 .filter(|&&tid| !session.wait_until_tasks.contains(&tid))
362 .cloned()
363 .collect();
364 if !dangling.is_empty() {
365 return Err(anyhow!("{} dangling tasks", dangling.len()));
366 }
367 }
368 Ok(())
369 }
370
371 // Other test methods (e.g., GPUI block simulation)
372 pub fn gen_block_on_ticks(&self) -> usize {
373 let state = self.state.lock().unwrap();
374 state.rng.gen_range(state.block_on_ticks_range.clone())
375 }
376}
377```
378
379## GPUI Usage Example
380
381GPUI wraps the TestScheduler to maintain its existing API:
382
383```rust
384pub struct BackgroundExecutor {
385 scheduler: Arc<dyn Scheduler>, // TestScheduler implements Scheduler
386}
387
388impl BackgroundExecutor {
389 pub fn spawn_labeled<R>(
390 &self,
391 label: TaskLabel,
392 future: impl Future<Output = R> + Send + 'static
393 ) -> Task<R> {
394 // Direct downcast for GPUI test features
395 if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
396 test_sched.deprioritize(label); // Use test method directly
397 }
398 self.scheduler.spawn(future)
399 }
400
401 pub fn deprioritize(&self, label: TaskLabel) {
402 if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
403 test_sched.deprioritize(label);
404 }
405 }
406}
407```
408
409## Cloud Usage Example
410
411Cloud wraps for session coordination:
412
413```rust
414impl SimulatedExecutionContext {
415 pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
416 let task = self.scheduler.spawn(async move { future.await })?;
417
418 // Direct use of TestScheduler methods
419 let scheduler = self.scheduler.as_any().downcast_ref::<TestScheduler>().unwrap();
420 if let Some(session_id) = scheduler.get_current_session() {
421 scheduler.track_task_for_session(task.id(), session_id);
422 scheduler.add_wait_until_task(session_id, task.id());
423 }
424
425 Ok(())
426 }
427}
428```
429
430## Key Dependencies and Assumptions
431
432- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
433- **async_task**: For Runnable wrapping.
434- **rand_chacha**: For seeded RNG.
435- **futures**: For channels.
436- **chrono**: For time ranges (optional).
437- **anyhow**: For errors.
438
439The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
440
441This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct.