2025-08-31_15-47-17_unified-scheduler-architecture.md

  1# Unified Scheduler Architecture - Layered Design
  2
  3## Overview
  4
  5A clean layered architecture where:
  6- **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`)
  7- **GPUI**: Uses trait objects for production safety, test features via `TestScheduler`
  8- **Cloud**: Session wrapper uses `TestScheduler` for session coordination
  9
 10Key design principles:
 11- Main `Scheduler` trait has only essential methods (no test pollution)
 12- Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific
 13- Production schedulers implement minimal interface
 14- Cloud requires `TestScheduler` for session features
 15
 16## Core Architecture
 17
 18```
 19β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 20β”‚            Shared Crate                 β”‚
 21β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
 22β”‚ Scheduler trait:                        β”‚
 23β”‚ - spawn() - core Send futures           β”‚
 24β”‚ - spawn_foreground() - non-Send         β”‚
 25β”‚ - Platform integration (park, now)      β”‚
 26β”‚                                         β”‚
 27β”‚ TestScheduler:                          β”‚
 28β”‚ - Implements Scheduler + test features  β”‚
 29β”‚ - deprioritize() - test-only method     β”‚
 30β”‚ - spawn_labeled() - labels for testing  β”‚
 31β”‚                                         β”‚
 32β”‚ GcdScheduler/ThreadPoolScheduler:       β”‚
 33β”‚ - Minimal Scheduler implementations     β”‚
 34β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 35                    β–²
 36          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 37          β”‚         β”‚         β”‚
 38β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”  β”Œβ”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
 39β”‚   GPUI       β”‚  β”‚     Cloud       β”‚
 40β”‚ Uses trait   β”‚  β”‚ CloudSimulated  β”‚
 41β”‚ objects      β”‚  β”‚ uses Test-     β”‚
 42β”‚ + TestSchedulerβ”‚  β”‚ Scheduler     β”‚
 43β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 44```
 45
 46## Scheduler Trait Definition
 47
 48```rust
 49pub trait Scheduler: Send + Sync {
 50    /// Spawn Send future (core functionality)
 51    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>;
 52
 53    /// Spawn Send future with label (defaults to ignoring label)
 54    fn spawn_labeled<R>(
 55        &self,
 56        _label: TaskLabel,
 57        future: impl Future<Output = R> + Send + 'static
 58    ) -> Task<R>
 59    where R: Send + 'static {
 60        // Default: ignore label and just spawn normally
 61        self.spawn(future)
 62    }
 63
 64    /// Spawn non-Send future on main thread (optional, defaults to panic)
 65    fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
 66    where R: 'static {
 67        panic!("spawn_foreground not supported by this scheduler");
 68    }
 69
 70    /// Platform integration methods
 71    fn park(&self, timeout: Option<Duration>) -> bool { false }
 72    fn unparker(&self) -> Unparker { Arc::new(|_| {}).into() }
 73    fn is_main_thread(&self) -> bool;
 74    fn now(&self) -> Instant;
 75}
 76```
 77
 78**Explanation:**
 79- Core trait has only essential methods
 80- No test-specific methods (deprioritize stays off main trait)
 81- Production schedulers implement minimal interface
 82- Test features are concrete `TestScheduler` methods
 83
 84## Task<T> Definition
 85
 86```rust
 87#[derive(Debug)]
 88pub struct Task<T> {
 89    inner: TaskState<T>,
 90    id: TaskId,  // Mandatory for coordination
 91    metadata: TaskMetadata,
 92}
 93
 94#[derive(Debug)]
 95pub struct TaskMetadata {
 96    label: Option<TaskLabel>,        // GPUI test identification
 97    session: Option<SessionId>,      // Cloud session association
 98    spawn_location: Option<&'static std::panic::Location>,
 99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub struct TaskId(pub usize);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
105pub struct TaskLabel(NonZeroUsize);
106```
107
108**Explanation:**
109- Mandatory TaskId for session coordination
110- Optional metadata for GPUI labels and Cloud sessions
111- Task implements Future directly
112
113## TestScheduler (Concrete Implementation)
114
115```rust
116pub struct TestScheduler {
117    inner: RefCell<TestSchedulerInner>,
118}
119
120struct TestSchedulerInner {
121    tasks: HashMap<TaskId, TaskState>,
122    deprioritized_labels: HashSet<TaskLabel>,  // Test-specific state
123    delayed: Vec<(Instant, Runnable)>,
124    parker: Parker,
125    is_main_thread: bool,
126    now: Instant,
127    next_task_id: AtomicUsize,
128}
129
130impl Scheduler for TestScheduler {
131    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
132        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
133        let task = self.create_task(future, task_id);
134        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
135        task
136    }
137
138    fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R> {
139        assert!(self.is_main_thread(), "spawn_foreground called off main thread");
140        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
141        let task = self.create_local_task(future, task_id);
142        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
143        task
144    }
145
146    fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
147    fn now(&self) -> Instant { self.inner.borrow().now }
148    fn park(&self, timeout: Option<Duration>) -> bool {
149        self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
150    }
151    fn spawn_labeled<R>(
152        &self,
153        label: TaskLabel,
154        future: impl Future<Output = R> + Send + 'static
155    ) -> Task<R>
156    where R: Send + 'static {
157        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
158        let task = self.create_task_with_label(future, task_id, label);
159
160        // Apply deprioritization if label is registered
161        if self.inner.borrow().deprioritized_labels.contains(&label) {
162            self.move_to_deprioritized_queue(task_id);
163        }
164
165        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
166        task
167    }
168
169    fn unparker(&self) -> Unparker {
170        self.inner.borrow().parker.unparker()
171    }
172}
173
174// Test-specific methods (NOT on main trait)
175impl TestScheduler {
176    pub fn deprioritize(&self, label: TaskLabel) {
177        self.inner.borrow_mut().deprioritized_labels.insert(label);
178    }
179
180    pub fn spawn_labeled<R>(
181        &self,
182        label: TaskLabel,
183        future: impl Future<Output = R> + Send + 'static
184    ) -> Task<R> {
185        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
186        let mut task = self.create_task(future, task_id);
187        task.metadata.label = Some(label);  // Set label in metadata
188
189        // Apply deprioritization if label is registered
190        if self.inner.borrow().deprioritized_labels.contains(&label) {
191            self.move_to_deprioritized_queue(task_id);
192        }
193
194        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
195        task
196    }
197
198    pub fn is_task_running(&self, task_id: TaskId) -> bool {
199        self.inner.borrow().tasks.contains_key(&task_id)
200    }
201
202    fn create_task<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId) -> Task<R> {
203        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
204            // Schedule to appropriate queue based on label
205            self.schedule_runnable(runnable, task_id);
206        });
207        runnable.schedule();
208
209        Task {
210            inner: TaskState::Spawned(inner_task),
211            id: task_id,
212            metadata: TaskMetadata {
213                label: None,
214                session: None,
215                spawn_location: Some(std::panic::Location::caller()),
216            },
217        }
218    }
219
220    fn create_task_with_label<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId, label: TaskLabel) -> Task<R>
221    where R: Send + 'static {
222        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
223            self.schedule_runnable_with_label(runnable, task_id, label);
224        });
225        runnable.schedule();
226
227        Task {
228            inner: TaskState::Spawned(inner_task),
229            id: task_id,
230            metadata: TaskMetadata {
231                label: Some(label),
232                session: None,
233                spawn_location: Some(std::panic::Location::caller()),
234            },
235        }
236    }
237
238    fn schedule_runnable_with_label(&self, runnable: Runnable, task_id: TaskId, label: TaskLabel) {
239        // TestScheduler-specific scheduling logic for labeled tasks
240        if self.inner.borrow().deprioritized_labels.contains(&label) {
241            // Put in deprioritized queue for test determinism
242            self.inner.borrow_mut().deprioritized_queue.push(runnable);
243        } else {
244            // Schedule normally
245            runnable.schedule();
246        }
247    }
248}
249```
250
251**Explanation:**
252- `deprioritize()` is a TestScheduler-specific method (not on main trait)
253- `spawn_labeled()` is TestScheduler-specific (not on main trait)
254- `is_task_running()` provides task status for Cloud session validation
255- Test-specific state stays in TestScheduler
256
257## Production Schedulers
258
259```rust
260pub struct GcdScheduler {
261    main_queue: dispatch_queue_t,
262    background_queue: dispatch_queue_t,
263    task_counter: AtomicUsize,
264}
265
266impl Scheduler for GcdScheduler {
267    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
268        let task_id = TaskId(self.task_counter.fetch_add(1, Ordering::SeqCst));
269        let (runnable, task) = async_task::spawn(future, move |runnable| {
270            unsafe { dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); }
271        });
272        runnable.schedule();
273
274        Task {
275            inner: TaskState::Spawned(task),
276            id: task_id,
277            metadata: TaskMetadata::default(),
278        }
279    }
280
281    fn is_main_thread(&self) -> bool {
282        // macOS-specific main thread detection
283        unsafe { msg_send![class!(NSThread), isMainThread] }
284    }
285
286    fn now(&self) -> Instant { Instant::now() }
287}
288```
289
290**Explanation:**
291- Production schedulers implement only core `Scheduler` trait
292- No test-specific methods (deprioritize stays off main trait)
293- Minimal implementation, no task tracking overhead
294
295## GPUI Integration
296
297```rust
298// BackgroundExecutor uses trait objects (production-safe)
299pub struct BackgroundExecutor {
300    scheduler: Arc<dyn Scheduler>,  // Any Scheduler implementation
301}
302
303impl BackgroundExecutor {
304    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
305        Self { scheduler }
306    }
307
308    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
309    where R: Send + 'static {
310        self.scheduler.spawn(future)
311    }
312
313    pub fn spawn_labeled<R>(
314        &self,
315        label: TaskLabel,
316        future: impl Future<Output = R> + Send + 'static
317    ) -> Task<R>
318    where R: Send + 'static {
319        self.scheduler.spawn_labeled(label, future)
320    }
321
322    // When GPUI needs test features, it downcasts to TestScheduler
323    pub fn deprioritize(&self, label: TaskLabel) {
324        // Downcast to access TestScheduler-specific features
325        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
326            test_scheduler.deprioritize(label);
327        } else {
328            // Production: do nothing
329        }
330    }
331}
332
333// ForegroundExecutor also uses trait objects
334pub struct ForegroundExecutor {
335    scheduler: Rc<dyn Scheduler>,
336}
337
338impl ForegroundExecutor {
339    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
340    where R: 'static {
341        self.scheduler.spawn_foreground(future)
342    }
343}
344```
345
346**Explanation:**
347- GPUI executors use trait objects for production safety
348- Test features accessed via downcasting to TestScheduler
349- Production deployments can use minimal schedulers
350- Test deployments get full test features
351
352## Cloud Integration
353
354```rust
355// Cloud wrapper requires TestScheduler for session features
356pub struct CloudSimulatedScheduler {
357    test_scheduler: Arc<TestScheduler>,  // Concrete type for test features
358    inner: RefCell<CloudSimulatedSchedulerInner>,
359}
360
361struct CloudSimulatedSchedulerInner {
362    current_session: Option<SessionId>,
363    sessions: HashMap<SessionId, SessionData>,
364    task_to_session: HashMap<TaskId, SessionId>,
365}
366
367impl CloudSimulatedScheduler {
368    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
369        let task = self.test_scheduler.spawn(future);
370
371        // Auto-associate with current session
372        if let Some(session_id) = self.inner.borrow().current_session {
373            self.inner.borrow_mut().task_to_session.insert(task.id(), session_id);
374            // Track in session...
375        }
376
377        task
378    }
379
380    pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
381        // Use TestScheduler's task tracking for validation
382        let inner = self.inner.borrow();
383
384        if let Some(session) = inner.sessions.get(&session_id) {
385            let running_tasks: Vec<TaskId> = session
386                .spawned_tasks
387                .iter()
388                .filter(|&&task_id| self.test_scheduler.is_task_running(task_id))
389                .copied()
390                .collect();
391
392            // Check against explicit wait_until permissions
393            let unauthorized = running_tasks.difference(&session.wait_until_task_ids);
394
395            if unauthorized.next().is_some() {
396                return Err(anyhow!("Session cleanup failed: unauthorized tasks still running"));
397            }
398        }
399
400        Ok(())
401    }
402}
403```
404
405**Explanation:**
406- Cloud requires `TestScheduler` because session features need task tracking
407- Auto-associates tasks with current session
408- Uses TestScheduler's `is_task_running()` for validation
409- Session coordination is test-focused infrastructure
410
411## Migration Strategy
412
413### Phase 1: Core Infrastructure
4141. Define `Scheduler` trait (core methods only)
4152. Implement `TestScheduler` (with test features like `deprioritize()`)
4163. Implement production schedulers (GCD, ThreadPool)
4174. Define `Task<T>` with mandatory TaskId
418
419### Phase 2: GPUI Migration
4201. Update GPUI executors to use trait objects
4212. Add downcasting for test features
4223. Preserve all existing GPUI functionality
4234. Test deployments use TestScheduler, production uses minimal schedulers
424
425### Phase 3: Cloud Integration
4261. Cloud wrapper uses TestScheduler for session coordination
4272. Maintain automatic session association
4283. Preserve `wait_until` and validation behavior
4294. Application code unchanged
430
431### Phase 4: Validation
4321. GPUI tests work with new architecture
4332. Cloud session behavior preserved
4343. Production efficiency maintained
435
436## Benefits
437
438βœ… **Clean Separation**: Test methods only on TestScheduler
439βœ… **Production Safety**: GPUI executors use trait objects
440βœ… **Session Intelligence**: Cloud gets full coordination features
441βœ… **Flexible Architecture**: Production vs test deployments
442βœ… **Backward Compatibility**: All existing functionality preserved
443
444This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities.
445
446## Implementation Reference
447
448### GPUI File Paths & Types
449
450**Core Executor Files:**
451- `crates/gpui/src/executor.rs` - `BackgroundExecutor`, `ForegroundExecutor`, `Task<T>`
452- `crates/gpui/src/app/app.rs` - App-level executor access
453- `crates/gpui/src/platform.rs` - `PlatformDispatcher` trait (current system)
454
455**Types to Update:**
456- `BackgroundExecutor` - Switch from `PlatformDispatcher` to `Arc<dyn Scheduler>`
457- `ForegroundExecutor` - Switch from `PlatformDispatcher` to `Rc<dyn Scheduler>`
458- `Task<T>` - Ensure compatibility with new `Task<T>` design
459
460**Test Infrastructure:**
461- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` (current)
462- Will need new `TestScheduler` implementation
463- `TaskLabel` usage in tests
464
465### Cloud File Paths & Types
466
467**Core Runtime Files:**
468- `crates/platform_simulator/src/runtime.rs` - `SimulatorRuntime`, session management
469- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext`
470- `crates/platform_simulator/src/lib.rs` - Cloud worker setup
471
472**Key Types:**
473- `SessionId` - Session identification
474- `WorkerSession` - Session state tracking
475- `ExecutionContext::wait_until()` - Session coordination API
476- `SimulatorRuntime::validate_session_cleanup()` - Cleanup validation
477
478**Worker Files:**
479- `crates/client_api/src/client_api.rs` - API endpoints using sessions
480- `crates/cloud_worker/src/worker.rs` - Worker execution with sessions
481- `crates/cloudflare_platform/src/execution_context.rs` - Platform-specific execution context
482
483### Migration Points
484
485**GPUI Areas:**
486- Replace `cx.background_executor()` calls with new executors
487- Update any direct `PlatformDispatcher` usage
488- Preserve `spawn_labeled()` and `deprioritize()` APIs
489
490**Cloud Areas:**
491- Replace `SimulatorRuntime` usage in tests
492- Update session management to use new scheduler wrapper
493- Preserve `wait_until()` and session validation behavior
494
495### Test Files Impacted
496
497**GPUI Tests:**
498- `crates/gpui/src/app/test_context.rs` - Test setup
499- `crates/gpui_macros/src/test.rs` - Test macro generation
500- Project-specific test files using `deprioritize()`
501
502**Cloud Tests:**
503- `crates/platform_simulator/src/lib.rs` - Test setup
504- Worker test files using session features
505- Session validation test cases
506
507### Platform Backend Files
508
509**macOS:**
510- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` β†’ `GcdScheduler`
511
512**Linux:**
513- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` β†’ `ThreadPoolScheduler`
514
515**Windows:**
516- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` β†’ `ThreadPoolScheduler`
517
518**Test:**
519- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β†’ `TestScheduler`
520
521## Compatibility Checklist
522
523### GPUI Compatibility
524- βœ… `spawn()` β†’ `scheduler.spawn()`
525- βœ… `spawn_labeled(label)` β†’ `scheduler.spawn_labeled(label)`
526- βœ… `deprioritize()` β†’ Downcast to TestScheduler
527- βœ… `timer()` β†’ `scheduler.timer()`
528- βœ… `BackgroundExecutor` β†’ Trait object wrapper
529
530### Cloud Compatibility
531- βœ… `ExecutionContext::wait_until()` β†’ Scheduler wrapper
532- βœ… Session validation β†’ `validate_session_cleanup()`
533- βœ… Automatic session association β†’ Wrapper intelligence
534- βœ… Task cleanup checking β†’ TestScheduler task tracking
535- βœ… `spawn()` in sessions β†’ Auto-association
536
537### Test Compatibility
538- βœ… Test determinism β†’ TestScheduler deprioritization
539- βœ… Task labeling β†’ TestScheduler spawn_labeled override
540- βœ… Session coordination β†’ Cloud wrapper
541- βœ… Production efficiency β†’ Minimal scheduler implementations
542
543## Next Steps
544
5451. **Create shared scheduler crate** with core types
5462. **Implement TestScheduler** with task tracking and test features
5473. **Update GPUI executors** to use trait objects
5484. **Create Cloud wrapper** with session coordination
5495. **Migrate platform backends** to new scheduler implementations
5506. **Update tests** to use new architecture
5517. **Validate performance** and backward compatibility