2025-08-31_15-47-17_test-scheduler-design.md

  1<file_path>
  2zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md
  3</file_path>
  4
  5<edit_description>
  6Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety
  7</edit_description>
  8
  9# TestScheduler Design Details
 10
 11This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios.
 12
 13## Overview
 14
 15The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
 16
 17- **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
 18- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan).
 19- **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
 20
 21There is **no separate TestScheduler trait**β€”all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context.
 22
 23## Design Principles
 24
 25- **Minimal Complexity**: No unnecessary traits or abstractions; direct methods for test features.
 26- **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
 27- **Determinism**: Always seeded with configurable randomization.
 28- **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
 29- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor).
 30
 31## Complete TestScheduler Implementation
 32
 33```rust
 34use std::collections::{HashMap, HashSet, VecDeque};
 35use std::future::Future;
 36use std::ops::RangeInclusive;
 37use std::pin::Pin;
 38use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 39use std::sync::{Arc, Mutex};
 40use std::task::{Context, Poll, Waker};
 41use std::time::{Duration, Instant};
 42
 43use anyhow::{anyhow, Result};
 44use async_task::Runnable;
 45use chrono::{DateTime, Utc};
 46use futures::channel::oneshot;
 47use parking_lot::{Mutex as ParkingMutex, MutexGuard};
 48use rand::prelude::*;
 49use rand_chacha::ChaCha8Rng;
 50
 51// Core types (shared with main crate)
 52#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 53pub struct TaskId(usize);
 54
 55#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 56pub struct TaskLabel(usize);
 57
 58#[derive(Clone, Copy, PartialEq, Eq, Hash)]
 59pub struct SessionId(usize);
 60
 61pub enum ThreadTarget { Main, Background }
 62
 63pub enum VariationSource {
 64    /// Seeded randomness for reproducible probabilistic scheduling.
 65    Seeded(u64),
 66    /// Fuzzed inputs for deterministic exploration of variations.
 67    Fuzzed(SchedulerFuzzInput),
 68}
 69
 70#[derive(Clone, Debug)]
 71pub struct SchedulerConfig {
 72    /// Whether to randomize task ordering (e.g., queue shuffling); defaults to true for coverage.
 73    pub randomize_order: bool,
 74    /// Max steps before panic (for deadlock detection).
 75    pub max_steps: usize,
 76    /// Whether to log operations (for debugging).
 77    pub log_operations: bool,
 78    /// The source of non-deterministic decisions (mutually exclusive modes).
 79    pub variation_source: VariationSource,
 80}
 81
 82impl Default for SchedulerConfig {
 83    fn default() -> Self {
 84        Self {
 85            randomize_order: true,
 86            max_steps: 10000,
 87            log_operations: false,
 88            variation_source: VariationSource::Seeded(0),
 89        }
 90    }
 91}
 92
 93impl SchedulerConfig {
 94    /// Create a seeded config (randomization enabled by default).
 95    pub fn seeded(seed: u64) -> Self {
 96        Self {
 97            variation_source: VariationSource::Seeded(seed),
 98            ..Default::default()
 99        }
100    }
101
102    /// Create a fuzzed config (randomization enabled by default).
103    pub fn fuzzed(fuzz_inputs: SchedulerFuzzInput) -> Self {
104        Self {
105            variation_source: VariationSource::Fuzzed(fuzz_inputs),
106            ..Default::default()
107        }
108    }
109
110    /// For isolation: Disable randomization for deterministic testing.
111    pub fn deterministic(seed: u64) -> Self {
112        Self {
113            randomize_order: false,
114            variation_source: VariationSource::Seeded(seed),
115            ..Default::default()
116        }
117    }
118}
119
120#[derive(Arbitrary, Debug, Clone)]
121pub struct SchedulerFuzzInput {
122    queue_selections: Vec<u8>,
123    task_indices: Vec<u32>,
124    delay_bools: Vec<bool>,
125    block_ticks: Vec<usize>,
126}
127
128pub struct Task<T> {
129    id: TaskId,
130    rx: oneshot::Receiver<T>,
131    scheduler: Arc<TestScheduler>,
132}
133
134impl<T> Future for Task<T> {
135    type Output = T;
136    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
137        match self.rx.try_recv() {
138            Ok(Some(val)) => Poll::Ready(val),
139            _ => { cx.waker().wake_by_ref(); Poll::Pending }
140        }
141    }
142}
143
144// Internal state
145struct TaskInfo {
146    label: Option<TaskLabel>,
147    session_id: Option<SessionId>,
148    waker: Option<Waker>,
149    future: Option<Pin<Box<dyn Future<Output = ()>>>>,
150    state: TaskState,
151    delay_range: Option<(Instant, Instant)>,
152}
153
154#[derive(PartialEq)]
155enum TaskState { Pending, Running, Completed }
156
157struct WorkerSession {
158    spawned_tasks: HashSet<TaskId>,
159    wait_until_tasks: HashSet<TaskId>,
160}
161
162struct SchedulerState {
163    rng: ChaCha8Rng,
164    randomize_order: bool,
165    max_steps: usize,
166    current_time: Instant,
167    next_task_id: AtomicUsize,
168    tasks: HashMap<TaskId, TaskInfo>,
169    sessions: HashMap<SessionId, WorkerSession>,
170    current_session: Option<SessionId>,
171    ready_queue: VecDeque<TaskId>,
172    deprioritized_queue: VecDeque<TaskId>,
173    main_thread_queue: VecDeque<TaskId>,
174    delayed: Vec<(Instant, TaskId)>,
175    deprioritized_labels: HashSet<TaskLabel>,
176    block_tick_range: std::ops::RangeInclusive<usize>,
177    parker: parking_lot::Parker,
178    unparker: parking_lot::Unparker,
179    parking_allowed: AtomicBool,
180    execution_history: Vec<String>,
181    fuzz_inputs: Option<FuzzedSchedulerInputs>,
182}
183
184// Concrete implementation
185pub struct TestScheduler {
186    state: Arc<Mutex<SchedulerState>>,
187}
188```
189
190impl TestScheduler {
191    /// Primary constructor: Create a scheduler from full configuration.
192    pub fn new(config: SchedulerConfig) -> Arc<Self> {
193        let (parker, unparker) = parking_lot::pair();
194        let (rng, fuzz_inputs) = match config.variation_source {
195            VariationSource::Seeded(seed) => (ChaCha8Rng::seed_from_u64(seed), None),
196            VariationSource::Fuzzed(inputs) => (ChaCha8Rng::seed_from_u64(0), Some(inputs)),
197        };
198        let state = SchedulerState {
199            rng,
200            randomize_order: config.randomize_order,
201            max_steps: config.max_steps,
202            current_time: Instant::now(),
203            next_task_id: AtomicUsize::new(1),
204            tasks: HashMap::new(),
205            sessions: HashMap::new(),
206            current_session: None,
207            ready_queue: VecDeque::new(),
208            deprioritized_queue: VecDeque::new(),
209            main_thread_queue: VecDeque::new(),
210            delayed: Vec::new(),
211            deprioritized_labels: HashSet::new(),
212            block_tick_range: 0..=1000,
213            parker,
214            unparker,
215            parking_allowed: AtomicBool::new(false),
216            execution_history: Vec::new(),
217            fuzz_inputs,
218        };
219        Arc::new(Self { state: Arc::new(Mutex::new(state)) })
220    }
221
222    /// Convenience helper: Create a seeded scheduler (randomization enabled by default).
223    pub fn from_seed(seed: u64) -> Arc<Self> {
224        Self::new(SchedulerConfig::seeded(seed))
225    }
226
227    /// Convenience helper: Create a scheduler driven by fuzzed inputs.
228    /// Use for fuzzing with Bolero.
229    pub fn from_fuzz(fuzz_inputs: SchedulerFuzzInput) -> Arc<Self> {
230        Self::new(SchedulerConfig::fuzzed(fuzz_inputs))
231    }
232
233    // Test-specific methods (direct on impl, no trait)
234    pub fn assign_task_id(&self) -> TaskId {
235        TaskId(self.state.lock().unwrap().next_task_id.fetch_add(1, Ordering::SeqCst))
236    }
237
238    pub fn deprioritize(&self, label: TaskLabel) {
239        let mut state = self.state.lock().unwrap();
240        state.deprioritized_labels.insert(label);
241    }
242
243    pub fn is_task_running(&self, task_id: TaskId) -> bool {
244        let state = self.state.lock().unwrap();
245        state.tasks.get(&task_id).map_or(false, |t| t.state == TaskState::Running)
246    }
247
248    pub fn tick(&self, background_only: bool) -> bool {
249        self.tick_internal(background_only)
250    }
251
252    fn tick_internal(&self, background_only: bool) -> bool {
253        // Process delays first (drop lock before polling)
254        {
255            let mut state = self.state.lock().unwrap();
256            state.delayed.retain(|&(time, task_id)| {
257                if time <= state.current_time && (!state.randomize_order || state.rng.gen_bool(0.5)) {
258                    state.ready_queue.push_back(task_id);
259                    false
260                } else { true }
261            });
262        } // Lock dropped here
263
264        // Select and poll task without lock held
265        let task_to_poll = {
266            let mut state = self.state.lock().unwrap();
267            let mut queues = vec![&mut state.ready_queue, &mut state.deprioritized_queue];
268            if !background_only { queues.insert(0, &mut state.main_thread_queue); }
269
270            let mut available: Vec<usize> = queues.iter().enumerate()
271                .filter(|&(_, q)| !q.is_empty())
272                .map(|(i, _)| i)
273                .collect();
274
275            if available.is_empty() { return false; }
276
277            if state.randomize_order { available.shuffle(&mut state.rng); }
278
279            let queue_ix = available[0];
280            let task_id = queues[queue_ix].pop_front().unwrap();
281            Some(task_id)
282        }; // Lock dropped here
283
284        // Poll the task's future outside the lock
285        let poll_result = {
286            let mut state = self.state.lock().unwrap();
287            if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
288                task.state = TaskState::Running;
289                if let Some(fut) = task.future.as_mut() {
290                    if let Some(waker) = task.waker.as_ref() {
291                        let mut context = Context::from_waker(waker);
292                        fut.as_mut().poll(&mut context)
293                    } else {
294                        Poll::Pending
295                    }
296                } else {
297                    Poll::Pending
298                }
299            } else {
300                Poll::Pending
301            }
302        }; // Lock dropped here
303
304        // Update task state after polling
305        if poll_result.is_ready() {
306            let mut state = self.state.lock().unwrap();
307            if let Some(task) = state.tasks.get_mut(&task_to_poll.unwrap()) {
308                task.state = TaskState::Completed;
309                state.execution_history.push(format!("Ticked task {}", task_to_poll.unwrap().0));
310            }
311        }
312
313        true
314    }
315
316    pub fn advance_clock(&self, duration: Duration) {
317        let mut state = self.state.lock().unwrap();
318        state.current_time += duration;
319    }
320
321    pub fn run_until_parked(&self) {
322        while self.tick(false) {}
323    }
324
325    // Cloud session methods
326    pub fn create_session(&self) -> SessionId {
327        let mut state = self.state.lock().unwrap();
328        let id = SessionId(state.sessions.len());
329        state.sessions.insert(id, WorkerSession { spawned_tasks: HashSet::new(), wait_until_tasks: HashSet::new() });
330        id
331    }
332
333    pub fn set_current_session(&self, session_id: Option<SessionId>) {
334        let mut state = self.state.lock().unwrap();
335        state.current_session = session_id;
336    }
337
338    pub fn get_current_session(&self) -> Option<SessionId> {
339        self.state.lock().unwrap().current_session
340    }
341
342    pub fn track_task_for_session(&self, task_id: TaskId, session_id: SessionId) {
343        let mut state = self.state.lock().unwrap();
344        if let Some(session) = state.sessions.get_mut(&session_id) {
345            session.spawned_tasks.insert(task_id);
346        }
347    }
348
349    pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
350        let mut state = self.state.lock().unwrap();
351        if let Some(session) = state.sessions.get_mut(&session_id) {
352            session.wait_until_tasks.insert(task_id);
353        }
354    }
355
356    pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
357        let state = self.state.lock().unwrap();
358        if let Some(session) = state.sessions.get(&session_id) {
359            let dangling: Vec<_> = session.spawned_tasks.iter()
360                .filter(|&&tid| state.tasks.get(&tid).map_or(false, |t| t.state != TaskState::Completed))
361                .filter(|&&tid| !session.wait_until_tasks.contains(&tid))
362                .cloned()
363                .collect();
364            if !dangling.is_empty() {
365                return Err(anyhow!("{} dangling tasks", dangling.len()));
366            }
367        }
368        Ok(())
369    }
370
371    // Other test methods (e.g., GPUI block simulation)
372    pub fn gen_block_on_ticks(&self) -> usize {
373        let state = self.state.lock().unwrap();
374        state.rng.gen_range(state.block_on_ticks_range.clone())
375    }
376}
377```
378
379## GPUI Usage Example
380
381GPUI wraps the TestScheduler to maintain its existing API:
382
383```rust
384pub struct BackgroundExecutor {
385    scheduler: Arc<dyn Scheduler>,  // TestScheduler implements Scheduler
386}
387
388impl BackgroundExecutor {
389    pub fn spawn_labeled<R>(
390        &self, 
391        label: TaskLabel,
392        future: impl Future<Output = R> + Send + 'static
393    ) -> Task<R> {
394        // Direct downcast for GPUI test features
395        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
396            test_sched.deprioritize(label);  // Use test method directly
397        }
398        self.scheduler.spawn(future)
399    }
400
401    pub fn deprioritize(&self, label: TaskLabel) {
402        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
403            test_sched.deprioritize(label);
404        }
405    }
406}
407```
408
409## Cloud Usage Example
410
411Cloud wraps for session coordination:
412
413```rust
414impl SimulatedExecutionContext {
415    pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
416        let task = self.scheduler.spawn(async move { future.await })?;
417        
418        // Direct use of TestScheduler methods
419        let scheduler = self.scheduler.as_any().downcast_ref::<TestScheduler>().unwrap();
420        if let Some(session_id) = scheduler.get_current_session() {
421            scheduler.track_task_for_session(task.id(), session_id);
422            scheduler.add_wait_until_task(session_id, task.id());
423        }
424        
425        Ok(())
426    }
427}
428```
429
430## Key Dependencies and Assumptions
431
432- **parking_lot**: For `Mutex` (not RwLock) and `Parker`/`Unparker`.
433- **async_task**: For Runnable wrapping.
434- **rand_chacha**: For seeded RNG.
435- **futures**: For channels.
436- **chrono**: For time ranges (optional).
437- **anyhow**: For errors.
438
439The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
440
441This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct.