1# Unified Scheduler Architecture - Layered Design
2
3## Overview
4
5A clean layered architecture where:
6- **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`)
7- **GPUI**: Uses trait objects for production safety, test features via `TestScheduler`
8- **Cloud**: Session wrapper uses `TestScheduler` for session coordination
9
10Key design principles:
11- Main `Scheduler` trait has only essential methods (no test pollution)
12- Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific
13- Production schedulers implement minimal interface
14- Cloud requires `TestScheduler` for session features
15
16## Core Architecture
17
18```
19βββββββββββββββββββββββββββββββββββββββββββ
20β Shared Crate β
21βββββββββββββββββββββββββββββββββββββββββββ€
22β Scheduler trait: β
23β - spawn() - core Send futures β
24β - spawn_foreground() - non-Send β
25β - Platform integration (park, now) β
26β β
27β TestScheduler: β
28β - Implements Scheduler + test features β
29β - deprioritize() - test-only method β
30β - spawn_labeled() - labels for testing β
31β β
32β GcdScheduler/ThreadPoolScheduler: β
33β - Minimal Scheduler implementations β
34βββββββββββββββββββββββββββββββββββββββββββ
35 β²
36 βββββββββββΌββββββββββ
37 β β β
38βββββββββββ΄βββββ βββ΄ββββββββββ΄βββββ
39β GPUI β β Cloud β
40β Uses trait β β CloudSimulated β
41β objects β β uses Test- β
42β + TestSchedulerβ β Scheduler β
43ββββββββββββββββ βββββββββββββββββββ
44```
45
46## Scheduler Trait Definition
47
48```rust
49pub trait Scheduler: Send + Sync {
50 /// Spawn Send future (core functionality)
51 fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>;
52
53 /// Spawn Send future with label (defaults to ignoring label)
54 fn spawn_labeled<R>(
55 &self,
56 _label: TaskLabel,
57 future: impl Future<Output = R> + Send + 'static
58 ) -> Task<R>
59 where R: Send + 'static {
60 // Default: ignore label and just spawn normally
61 self.spawn(future)
62 }
63
64 /// Spawn non-Send future on main thread (optional, defaults to panic)
65 fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
66 where R: 'static {
67 panic!("spawn_foreground not supported by this scheduler");
68 }
69
70 /// Platform integration methods
71 fn park(&self, timeout: Option<Duration>) -> bool { false }
72 fn unparker(&self) -> Unparker { Arc::new(|_| {}).into() }
73 fn is_main_thread(&self) -> bool;
74 fn now(&self) -> Instant;
75}
76```
77
78**Explanation:**
79- Core trait has only essential methods
80- No test-specific methods (deprioritize stays off main trait)
81- Production schedulers implement minimal interface
82- Test features are concrete `TestScheduler` methods
83
84## Task<T> Definition
85
86```rust
87#[derive(Debug)]
88pub struct Task<T> {
89 inner: TaskState<T>,
90 id: TaskId, // Mandatory for coordination
91 metadata: TaskMetadata,
92}
93
94#[derive(Debug)]
95pub struct TaskMetadata {
96 label: Option<TaskLabel>, // GPUI test identification
97 session: Option<SessionId>, // Cloud session association
98 spawn_location: Option<&'static std::panic::Location>,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub struct TaskId(pub usize);
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
105pub struct TaskLabel(NonZeroUsize);
106```
107
108**Explanation:**
109- Mandatory TaskId for session coordination
110- Optional metadata for GPUI labels and Cloud sessions
111- Task implements Future directly
112
113## TestScheduler (Concrete Implementation)
114
115```rust
116pub struct TestScheduler {
117 inner: RefCell<TestSchedulerInner>,
118}
119
120struct TestSchedulerInner {
121 tasks: HashMap<TaskId, TaskState>,
122 deprioritized_labels: HashSet<TaskLabel>, // Test-specific state
123 delayed: Vec<(Instant, Runnable)>,
124 parker: Parker,
125 is_main_thread: bool,
126 now: Instant,
127 next_task_id: AtomicUsize,
128}
129
130impl Scheduler for TestScheduler {
131 fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
132 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
133 let task = self.create_task(future, task_id);
134 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
135 task
136 }
137
138 fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R> {
139 assert!(self.is_main_thread(), "spawn_foreground called off main thread");
140 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
141 let task = self.create_local_task(future, task_id);
142 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
143 task
144 }
145
146 fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
147 fn now(&self) -> Instant { self.inner.borrow().now }
148 fn park(&self, timeout: Option<Duration>) -> bool {
149 self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
150 }
151 fn spawn_labeled<R>(
152 &self,
153 label: TaskLabel,
154 future: impl Future<Output = R> + Send + 'static
155 ) -> Task<R>
156 where R: Send + 'static {
157 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
158 let task = self.create_task_with_label(future, task_id, label);
159
160 // Apply deprioritization if label is registered
161 if self.inner.borrow().deprioritized_labels.contains(&label) {
162 self.move_to_deprioritized_queue(task_id);
163 }
164
165 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
166 task
167 }
168
169 fn unparker(&self) -> Unparker {
170 self.inner.borrow().parker.unparker()
171 }
172}
173
174// Test-specific methods (NOT on main trait)
175impl TestScheduler {
176 pub fn deprioritize(&self, label: TaskLabel) {
177 self.inner.borrow_mut().deprioritized_labels.insert(label);
178 }
179
180 pub fn spawn_labeled<R>(
181 &self,
182 label: TaskLabel,
183 future: impl Future<Output = R> + Send + 'static
184 ) -> Task<R> {
185 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
186 let mut task = self.create_task(future, task_id);
187 task.metadata.label = Some(label); // Set label in metadata
188
189 // Apply deprioritization if label is registered
190 if self.inner.borrow().deprioritized_labels.contains(&label) {
191 self.move_to_deprioritized_queue(task_id);
192 }
193
194 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
195 task
196 }
197
198 pub fn is_task_running(&self, task_id: TaskId) -> bool {
199 self.inner.borrow().tasks.contains_key(&task_id)
200 }
201
202 fn create_task<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId) -> Task<R> {
203 let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
204 // Schedule to appropriate queue based on label
205 self.schedule_runnable(runnable, task_id);
206 });
207 runnable.schedule();
208
209 Task {
210 inner: TaskState::Spawned(inner_task),
211 id: task_id,
212 metadata: TaskMetadata {
213 label: None,
214 session: None,
215 spawn_location: Some(std::panic::Location::caller()),
216 },
217 }
218 }
219
220 fn create_task_with_label<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId, label: TaskLabel) -> Task<R>
221 where R: Send + 'static {
222 let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
223 self.schedule_runnable_with_label(runnable, task_id, label);
224 });
225 runnable.schedule();
226
227 Task {
228 inner: TaskState::Spawned(inner_task),
229 id: task_id,
230 metadata: TaskMetadata {
231 label: Some(label),
232 session: None,
233 spawn_location: Some(std::panic::Location::caller()),
234 },
235 }
236 }
237
238 fn schedule_runnable_with_label(&self, runnable: Runnable, task_id: TaskId, label: TaskLabel) {
239 // TestScheduler-specific scheduling logic for labeled tasks
240 if self.inner.borrow().deprioritized_labels.contains(&label) {
241 // Put in deprioritized queue for test determinism
242 self.inner.borrow_mut().deprioritized_queue.push(runnable);
243 } else {
244 // Schedule normally
245 runnable.schedule();
246 }
247 }
248}
249```
250
251**Explanation:**
252- `deprioritize()` is a TestScheduler-specific method (not on main trait)
253- `spawn_labeled()` is TestScheduler-specific (not on main trait)
254- `is_task_running()` provides task status for Cloud session validation
255- Test-specific state stays in TestScheduler
256
257## Production Schedulers
258
259```rust
260pub struct GcdScheduler {
261 main_queue: dispatch_queue_t,
262 background_queue: dispatch_queue_t,
263 task_counter: AtomicUsize,
264}
265
266impl Scheduler for GcdScheduler {
267 fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
268 let task_id = TaskId(self.task_counter.fetch_add(1, Ordering::SeqCst));
269 let (runnable, task) = async_task::spawn(future, move |runnable| {
270 unsafe { dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); }
271 });
272 runnable.schedule();
273
274 Task {
275 inner: TaskState::Spawned(task),
276 id: task_id,
277 metadata: TaskMetadata::default(),
278 }
279 }
280
281 fn is_main_thread(&self) -> bool {
282 // macOS-specific main thread detection
283 unsafe { msg_send![class!(NSThread), isMainThread] }
284 }
285
286 fn now(&self) -> Instant { Instant::now() }
287}
288```
289
290**Explanation:**
291- Production schedulers implement only core `Scheduler` trait
292- No test-specific methods (deprioritize stays off main trait)
293- Minimal implementation, no task tracking overhead
294
295## GPUI Integration
296
297```rust
298// BackgroundExecutor uses trait objects (production-safe)
299pub struct BackgroundExecutor {
300 scheduler: Arc<dyn Scheduler>, // Any Scheduler implementation
301}
302
303impl BackgroundExecutor {
304 pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
305 Self { scheduler }
306 }
307
308 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
309 where R: Send + 'static {
310 self.scheduler.spawn(future)
311 }
312
313 pub fn spawn_labeled<R>(
314 &self,
315 label: TaskLabel,
316 future: impl Future<Output = R> + Send + 'static
317 ) -> Task<R>
318 where R: Send + 'static {
319 self.scheduler.spawn_labeled(label, future)
320 }
321
322 // When GPUI needs test features, it downcasts to TestScheduler
323 pub fn deprioritize(&self, label: TaskLabel) {
324 // Downcast to access TestScheduler-specific features
325 if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
326 test_scheduler.deprioritize(label);
327 } else {
328 // Production: do nothing
329 }
330 }
331}
332
333// ForegroundExecutor also uses trait objects
334pub struct ForegroundExecutor {
335 scheduler: Rc<dyn Scheduler>,
336}
337
338impl ForegroundExecutor {
339 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
340 where R: 'static {
341 self.scheduler.spawn_foreground(future)
342 }
343}
344```
345
346**Explanation:**
347- GPUI executors use trait objects for production safety
348- Test features accessed via downcasting to TestScheduler
349- Production deployments can use minimal schedulers
350- Test deployments get full test features
351
352## Cloud Integration
353
354```rust
355// Cloud wrapper requires TestScheduler for session features
356pub struct CloudSimulatedScheduler {
357 test_scheduler: Arc<TestScheduler>, // Concrete type for test features
358 inner: RefCell<CloudSimulatedSchedulerInner>,
359}
360
361struct CloudSimulatedSchedulerInner {
362 current_session: Option<SessionId>,
363 sessions: HashMap<SessionId, SessionData>,
364 task_to_session: HashMap<TaskId, SessionId>,
365}
366
367impl CloudSimulatedScheduler {
368 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
369 let task = self.test_scheduler.spawn(future);
370
371 // Auto-associate with current session
372 if let Some(session_id) = self.inner.borrow().current_session {
373 self.inner.borrow_mut().task_to_session.insert(task.id(), session_id);
374 // Track in session...
375 }
376
377 task
378 }
379
380 pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
381 // Use TestScheduler's task tracking for validation
382 let inner = self.inner.borrow();
383
384 if let Some(session) = inner.sessions.get(&session_id) {
385 let running_tasks: Vec<TaskId> = session
386 .spawned_tasks
387 .iter()
388 .filter(|&&task_id| self.test_scheduler.is_task_running(task_id))
389 .copied()
390 .collect();
391
392 // Check against explicit wait_until permissions
393 let unauthorized = running_tasks.difference(&session.wait_until_task_ids);
394
395 if unauthorized.next().is_some() {
396 return Err(anyhow!("Session cleanup failed: unauthorized tasks still running"));
397 }
398 }
399
400 Ok(())
401 }
402}
403```
404
405**Explanation:**
406- Cloud requires `TestScheduler` because session features need task tracking
407- Auto-associates tasks with current session
408- Uses TestScheduler's `is_task_running()` for validation
409- Session coordination is test-focused infrastructure
410
411## Migration Strategy
412
413### Phase 1: Core Infrastructure
4141. Define `Scheduler` trait (core methods only)
4152. Implement `TestScheduler` (with test features like `deprioritize()`)
4163. Implement production schedulers (GCD, ThreadPool)
4174. Define `Task<T>` with mandatory TaskId
418
419### Phase 2: GPUI Migration
4201. Update GPUI executors to use trait objects
4212. Add downcasting for test features
4223. Preserve all existing GPUI functionality
4234. Test deployments use TestScheduler, production uses minimal schedulers
424
425### Phase 3: Cloud Integration
4261. Cloud wrapper uses TestScheduler for session coordination
4272. Maintain automatic session association
4283. Preserve `wait_until` and validation behavior
4294. Application code unchanged
430
431### Phase 4: Validation
4321. GPUI tests work with new architecture
4332. Cloud session behavior preserved
4343. Production efficiency maintained
435
436## Benefits
437
438β
**Clean Separation**: Test methods only on TestScheduler
439β
**Production Safety**: GPUI executors use trait objects
440β
**Session Intelligence**: Cloud gets full coordination features
441β
**Flexible Architecture**: Production vs test deployments
442β
**Backward Compatibility**: All existing functionality preserved
443
444This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities.
445
446## Implementation Reference
447
448### GPUI File Paths & Types
449
450**Core Executor Files:**
451- `crates/gpui/src/executor.rs` - `BackgroundExecutor`, `ForegroundExecutor`, `Task<T>`
452- `crates/gpui/src/app/app.rs` - App-level executor access
453- `crates/gpui/src/platform.rs` - `PlatformDispatcher` trait (current system)
454
455**Types to Update:**
456- `BackgroundExecutor` - Switch from `PlatformDispatcher` to `Arc<dyn Scheduler>`
457- `ForegroundExecutor` - Switch from `PlatformDispatcher` to `Rc<dyn Scheduler>`
458- `Task<T>` - Ensure compatibility with new `Task<T>` design
459
460**Test Infrastructure:**
461- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` (current)
462- Will need new `TestScheduler` implementation
463- `TaskLabel` usage in tests
464
465### Cloud File Paths & Types
466
467**Core Runtime Files:**
468- `crates/platform_simulator/src/runtime.rs` - `SimulatorRuntime`, session management
469- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext`
470- `crates/platform_simulator/src/lib.rs` - Cloud worker setup
471
472**Key Types:**
473- `SessionId` - Session identification
474- `WorkerSession` - Session state tracking
475- `ExecutionContext::wait_until()` - Session coordination API
476- `SimulatorRuntime::validate_session_cleanup()` - Cleanup validation
477
478**Worker Files:**
479- `crates/client_api/src/client_api.rs` - API endpoints using sessions
480- `crates/cloud_worker/src/worker.rs` - Worker execution with sessions
481- `crates/cloudflare_platform/src/execution_context.rs` - Platform-specific execution context
482
483### Migration Points
484
485**GPUI Areas:**
486- Replace `cx.background_executor()` calls with new executors
487- Update any direct `PlatformDispatcher` usage
488- Preserve `spawn_labeled()` and `deprioritize()` APIs
489
490**Cloud Areas:**
491- Replace `SimulatorRuntime` usage in tests
492- Update session management to use new scheduler wrapper
493- Preserve `wait_until()` and session validation behavior
494
495### Test Files Impacted
496
497**GPUI Tests:**
498- `crates/gpui/src/app/test_context.rs` - Test setup
499- `crates/gpui_macros/src/test.rs` - Test macro generation
500- Project-specific test files using `deprioritize()`
501
502**Cloud Tests:**
503- `crates/platform_simulator/src/lib.rs` - Test setup
504- Worker test files using session features
505- Session validation test cases
506
507### Platform Backend Files
508
509**macOS:**
510- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` β `GcdScheduler`
511
512**Linux:**
513- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` β `ThreadPoolScheduler`
514
515**Windows:**
516- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` β `ThreadPoolScheduler`
517
518**Test:**
519- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β `TestScheduler`
520
521## Compatibility Checklist
522
523### GPUI Compatibility
524- β
`spawn()` β `scheduler.spawn()`
525- β
`spawn_labeled(label)` β `scheduler.spawn_labeled(label)`
526- β
`deprioritize()` β Downcast to TestScheduler
527- β
`timer()` β `scheduler.timer()`
528- β
`BackgroundExecutor` β Trait object wrapper
529
530### Cloud Compatibility
531- β
`ExecutionContext::wait_until()` β Scheduler wrapper
532- β
Session validation β `validate_session_cleanup()`
533- β
Automatic session association β Wrapper intelligence
534- β
Task cleanup checking β TestScheduler task tracking
535- β
`spawn()` in sessions β Auto-association
536
537### Test Compatibility
538- β
Test determinism β TestScheduler deprioritization
539- β
Task labeling β TestScheduler spawn_labeled override
540- β
Session coordination β Cloud wrapper
541- β
Production efficiency β Minimal scheduler implementations
542
543## Next Steps
544
5451. **Create shared scheduler crate** with core types
5462. **Implement TestScheduler** with task tracking and test features
5473. **Update GPUI executors** to use trait objects
5484. **Create Cloud wrapper** with session coordination
5495. **Migrate platform backends** to new scheduler implementations
5506. **Update tests** to use new architecture
5517. **Validate performance** and backward compatibility