1# Unified Scheduler Architecture - Layered Design
2
3## Overview
4
5A clean layered architecture where:
6- **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`)
7- **GPUI**: Uses trait objects for production safety, test features via `TestScheduler`
8- **Cloud**: Session coordination integrated directly in `SimulatedExecutionContext` using unified scheduler primitives
9
10Key design principles:
11- Main `Scheduler` trait has only essential methods (no test pollution)
12- Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific
13- Production schedulers implement minimal interface
14- Cloud handles session coordination at ExecutionContext layer using unified primitives
15
16## Core Architecture
17
18```
19βββββββββββββββββββββββββββββββββββββββββββ
20β Shared Crate β
21βββββββββββββββββββββββββββββββββββββββββββ€
22β Scheduler trait: β
23β - Core object-safe interface β
24β - Platform integration (park, now) β
25β β
26β TestScheduler: β
27β - Should implement Scheduler + test features β
28β - deprioritize() - test-only method β
29β - spawn_labeled() - labels for testing β
30β - Task lifecycle tracking β
31β β
32β Generic spawn helpers: β
33β - spawn() / spawn_foreground() β
34β - timer(), block(), block_with_timeout()β
35β - Future-based API for trait objects β
36βββββββββββββββββββββββββββββββββββββββββββ
37 β²
38 βββββββββββΌββββββββββ
39 β β β
40βββββββββββ΄βββββ βββ΄ββββββββββ΄βββββ
41β GPUI β β Cloud β
42β Uses trait β β Session coord. β
43β objects β β in ExecContext β
44β + TestSchedulerβ β + TestSchedulerβ
45ββββββββββββββββ βββββββββββββββββββ
46```
47
48## GPUI Integration
49
50### Platform Scheduler Implementations in GPUI
51
52Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
53
54- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
55- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
56- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
57
58**Rationale**: Platform implementations are substantial and deeply integrated with:
59- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
60- GPUI's event loop integration (main thread messaging)
61- Platform-specific performance optimizations
62
63The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI.
64
65### BackgroundExecutor Integration
66
67GPUI's executors will use trait objects for scheduling:
68
69```rust
70// crates/gpui/src/executor.rs
71pub struct BackgroundExecutor {
72 scheduler: Arc<dyn Scheduler>, // Any Scheduler implementation via trait objects
73}
74
75impl BackgroundExecutor {
76 pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
77 Self { scheduler }
78 }
79
80 // Core spawning methods via generic helpers
81 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
82 where R: Send + 'static {
83 // Generic spawn helper implemented on dyn Scheduler - full Future support
84 self.scheduler.spawn(future)
85 }
86
87 pub fn spawn_labeled<R>(
88 &self,
89 label: TaskLabel,
90 future: impl Future<Output = R> + Send + 'static
91 ) -> Task<R>
92 where R: Send + 'static {
93 // Generic spawn_labeled helper implemented on dyn Scheduler
94 self.scheduler.spawn_labeled(label, future)
95 }
96
97 // Timer functionality via generic helper using schedule_after
98 pub fn timer(&self, duration: Duration) -> Task<()> {
99 self.scheduler.timer(duration)
100 }
101
102 // Test-specific methods via downcast to TestScheduler
103 pub fn deprioritize(&self, label: TaskLabel) {
104 if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
105 test_scheduler.deprioritize(label);
106 } else {
107 // Production: ignore silently
108 }
109 }
110
111 pub fn tick(&self) -> Option<bool> {
112 self.scheduler.downcast_ref::<TestScheduler>()
113 .map(|ts| ts.tick())
114 }
115}
116```
117
118### ForegroundExecutor Integration
119
120```rust
121// crates/gpui/src/executor.rs
122pub struct ForegroundExecutor {
123 scheduler: Rc<dyn Scheduler>, // Rc for single-threaded use
124}
125
126impl ForegroundExecutor {
127 // Core spawning for main thread (non-Send futures)
128 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
129 where R: 'static {
130 // Generic spawn_foreground helper implemented on dyn Scheduler
131 self.scheduler.spawn_foreground(future)
132 }
133
134 // Timer and test methods same as BackgroundExecutor
135 pub fn timer(&self, duration: Duration) -> Task<()> {
136 self.scheduler.timer(duration)
137 }
138}
139```
140
141## Cloud Integration
142
143### Session Coordination in SimulatedExecutionContext
144
145Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers:
146
147```rust
148// crates/platform_simulator/src/platform.rs
149pub struct SimulatedExecutionContext {
150 scheduler: Arc<dyn Scheduler>, // Unified scheduler via composition
151 session_counter: AtomicUsize,
152 sessions: Mutex<HashMap<SessionId, WorkerSession>>,
153 current_session: Mutex<Option<SessionId>>,
154}
155
156#[async_trait(?Send)]
157impl PlatformRuntime for SimulatedExecutionContext {
158 async fn delay(&self, duration: Duration) {
159 // Use unified scheduler's delay mechanism through timer
160 self.scheduler.timer(duration).await;
161 }
162}
163```
164
165### Wait Until Implementation
166
167Session coordination integrated directly with unified task scheduling:
168
169```rust
170#[async_trait(?Send)]
171impl ExecutionContext for SimulatedExecutionContext {
172 fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
173 // 1. Spawn using unified scheduler
174 let task_id = self.scheduler.spawn(async move {
175 // Add delay via scheduler timer for deterministic simulation
176 self.scheduler.timer(Duration::from_millis(10)).await;
177 let _ = future.await;
178 })?;
179
180 // 2. Register with session coordination (direct access)
181 if let Some(session_id) = *self.current_session.lock() {
182 if let Some(session) = self.sessions.lock().get_mut(&session_id) {
183 session.wait_until_task_ids.insert(task_id);
184 self.link_task_to_session(task_id, session_id);
185 }
186 }
187
188 Ok(())
189 }
190
191 async fn pass_through(&self) -> Result<()> {
192 // Use unified scheduler's timer for delay
193 self.scheduler.timer(Duration::from_millis(10)).await;
194 Ok(())
195 }
196}
197```
198
199### Session Coordination Methods
200
201Core session operations handled within SimulatedExecutionContext:
202
203```rust
204impl SimulatedExecutionContext {
205 pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
206 Self {
207 scheduler,
208 session_counter: AtomicUsize::new(0),
209 sessions: Mutex::new(HashMap::new()),
210 current_session: Mutex::new(None),
211 }
212 }
213
214 pub fn create_session(&self) -> SessionId {
215 let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
216 let session_id = SessionId(session_counter);
217
218 self.sessions.lock().insert(session_id, WorkerSession {
219 spawned_tasks: HashSet::new(),
220 wait_until_task_ids: HashSet::new(),
221 });
222
223 session_id
224 }
225
226 pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
227 where
228 F: FnOnce() -> R,
229 {
230 {
231 let mut current = self.current_session.lock();
232 *current = Some(session_id);
233 }
234
235 let result = f();
236
237 {
238 let mut current = self.current_session.lock();
239 *current = None;
240 }
241
242 result
243 }
244
245 pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
246 let sessions = self.sessions.lock();
247 if let Some(session) = sessions.get(&session_id) {
248 // Check running tasks using unified scheduler's task tracking
249 let dangling_tasks: Vec<TaskId> = session
250 .spawned_tasks
251 .iter()
252 .filter(|&&task_id| self.scheduler.is_task_running(task_id))
253 .copied()
254 .collect();
255
256 // Cloud-specific permission check
257 let unauthorized: Vec<_> = dangling_tasks
258 .into_iter()
259 .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
260 .collect();
261
262 if !unauthorized.is_empty() {
263 return Err(platform_api::WorkerError::Other(anyhow!(
264 "Session cleanup failed: {} unauthorized tasks still running",
265 unauthorized.len()
266 )));
267 }
268 }
269 Ok(())
270 }
271
272 // Link tasks to sessions during spawning
273 fn link_task_to_session(&self, task_id: TaskId, session_id: SessionId) {
274 if let Some(session) = self.sessions.lock().get_mut(&session_id) {
275 session.spawned_tasks.insert(task_id);
276 }
277 }
278
279 fn spawn_with_session(&self, future: Pin<Box<dyn Future<Output = ()>>>) -> TaskId {
280 let task_id = self.scheduler.spawn(future)?;
281
282 // Auto-associate with current session
283 if let Some(session_id) = *self.current_session.lock() {
284 self.link_task_to_session(task_id, session_id);
285 }
286
287 Ok(task_id)
288 }
289}
290```
291
292### Cloud-Specific Data Structures
293
294```rust
295// Session coordination is Cloud-specific but built on unified scheduler
296pub struct WorkerSession {
297 spawned_tasks: HashSet<TaskId>, // Tracks tasks in session
298 wait_until_task_ids: HashSet<TaskId>, // Explicitly allowed background tasks
299}
300
301impl SimulatedExecutionContext {
302 pub fn set_current_session(&self, session_id: SessionId) {
303 *self.current_session.lock() = Some(session_id);
304 }
305
306 pub fn get_current_session(&self) -> Option<SessionId> {
307 *self.current_session.lock()
308 }
309}
310```
311
312### Architecture Benefits
313
314β
**Clean Composition**: Unified scheduling primitives + Cloud-specific session coordination
315β
**Unified Task Tracking**: Uses TestScheduler's `is_task_running()` for session validation
316β
**Natural Coupling**: Session coordination lives where ExecutionContext operates
317β
**Minimal Abstraction**: No additional coordinator layer needed
318β
**Cloud-Specific Concerns**: Session logic remains in Cloud repo
319β
**Test Integration**: Full TestScheduler features available for Cloud testing
320β
**Deterministic Simulation**: Session-aware timing and task ordering
321
322## Scheduler Trait Definition
323
324```rust
325pub trait Scheduler: Send + Sync {
326 /// Schedule a runnable to be executed (object-safe core functionality)
327 fn schedule(&self, runnable: Runnable);
328
329 /// Schedule a runnable with label for test tracking
330 fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
331
332 /// Schedule a runnable on the main thread (optional, defaults to panic)
333 fn schedule_foreground(&self, runnable: Runnable) {
334 panic!("schedule_foreground not supported by this scheduler");
335 }
336
337 /// Schedule a runnable after a delay (object-safe for timers)
338 fn schedule_after(&self, duration: Duration, runnable: Runnable);
339
340 /// Platform integration methods
341 fn park(&self, timeout: Option<Duration>) -> bool;
342 fn unparker(&self) -> Unparker;
343 fn is_main_thread(&self) -> bool;
344 fn now(&self) -> Instant;
345}
346```
347
348## TestScheduler (Concrete Implementation)
349
350```rust
351pub struct TestScheduler {
352 inner: RefCell<TestSchedulerInner>,
353}
354
355struct TestSchedulerInner {
356 tasks: HashMap<TaskId, TaskState>,
357 task_labels: HashMap<TaskId, TaskLabel>,
358 deprioritized_labels: HashSet<TaskLabel>,
359 deprioritized_queue: VecDeque<(Runnable, TaskId)>,
360 main_thread_queue: VecDeque<Runnable>,
361 delayed: Vec<(Instant, Runnable)>,
362 parker: Parker,
363 is_main_thread: bool,
364 now: Instant,
365 next_task_id: AtomicUsize,
366 rng: StdRng,
367 waiting_tasks: HashSet<TaskId>,
368 parking_allowed: bool,
369 waiting_hint: Option<String>,
370 block_tick_range: std::ops::RangeInclusive<usize>,
371}
372
373impl Scheduler for TestScheduler {
374 fn schedule(&self, runnable: Runnable) {
375 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
376 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
377
378 // Schedule completion callback
379 let scheduler = self.clone();
380 let completion_runnable = async_task::spawn(async move {
381 runnable.run();
382 scheduler.mark_task_completed(task_id);
383 }, |_| {}).0;
384
385 completion_runnable.schedule();
386 }
387
388 fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
389 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
390
391 if self.inner.borrow().deprioritized_labels.contains(&label) {
392 self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id));
393 self.inner.borrow_mut().task_labels.insert(task_id, label);
394 } else {
395 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
396 let completion_runnable = async_task::spawn(async move {
397 runnable.run();
398 // Mark as completed when done
399 }, |_| {}).0;
400 completion_runnable.schedule();
401 }
402 }
403
404 fn schedule_foreground(&self, runnable: Runnable) {
405 assert!(self.is_main_thread(), "schedule_foreground called off main thread");
406 let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
407 self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
408
409 let completion_runnable = async_task::spawn(async move {
410 runnable.run();
411 // Mark as completed
412 }, |_| {}).0;
413
414 self.inner.borrow_mut().main_thread_queue.push(completion_runnable);
415 }
416
417 fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
418 fn now(&self) -> Instant { self.inner.borrow().now }
419 fn park(&self, timeout: Option<Duration>) -> bool {
420 self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
421 }
422 fn unparker(&self) -> Unparker {
423 self.inner.borrow().parker.unparker()
424 }
425}
426
427impl TestScheduler {
428 // Test-specific methods (NOT on main trait)
429 pub fn deprioritize(&self, label: TaskLabel) {
430 self.inner.borrow_mut().deprioritized_labels.insert(label);
431 }
432
433 pub fn is_task_running(&self, task_id: TaskId) -> bool {
434 self.inner.borrow().tasks.contains_key(&task_id)
435 }
436
437 pub fn tick(&self) -> bool { /* implementation */ }
438 pub fn run_until_parked(&self) { /* implementation */ }
439 pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
440}
441```
442
443## Generic Spawn Helpers
444
445Generic spawn methods implemented for `dyn Scheduler`:
446
447```rust
448impl dyn Scheduler {
449 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
450 where R: Send + 'static {
451 let task_id = self.assign_task_id();
452 let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
453 self.mark_task_started(task_id);
454 self.schedule_completion_callback(runnable, task_id);
455 });
456
457 self.schedule(runnable);
458 Task { /* ... */ }
459 }
460
461 pub fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
462 where R: 'static {
463 let task_id = self.assign_task_id();
464 let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| {
465 self.mark_task_started(task_id);
466 self.schedule_completion_callback(runnable, task_id);
467 });
468
469 self.schedule_foreground(runnable);
470 Task { /* ... */ }
471 }
472
473 pub fn timer(&self, duration: Duration) -> Task<()> {
474 if duration.is_zero() {
475 return Task::ready(());
476 }
477
478 let (runnable, inner_task) = async_task::spawn(async move {}, {
479 let scheduler = &*self;
480 move |runnable| {
481 scheduler.schedule_after(duration, runnable);
482 }
483 });
484
485 runnable.schedule();
486 Task { /* ... */ }
487 }
488
489 pub fn is_task_running(&self, task_id: TaskId) -> bool {
490 // Requires downcast to TestScheduler
491 None // Default implementation
492 }
493}
494```
495
496## Migration Strategy
497
498### Phase 1: Core Infrastructure
4991. Define `Scheduler` trait (core methods only)
5002. Implement `TestScheduler` (with test features like `deprioritize()`)
5013. Make existing GPUI platform dispatchers implement `Scheduler` trait
502 - MacDispatcher implements `Scheduler` for GCD integration
503 - LinuxDispatcher implements `Scheduler` for thread pools
504 - WindowsDispatcher implements `Scheduler` for Windows ThreadPool
5054. Define `Task<T>` with mandatory TaskId
506
507### Phase 2: GPUI Migration
5081. Update GPUI executors to use trait objects
5092. Add downcasting for test features
5103. Preserve all existing GPUI functionality
5114. Test deployments use TestScheduler, production uses minimal schedulers
512
513### Phase 3: Cloud Integration
5141. Update `SimulatedExecutionContext` to use `Arc<dyn Scheduler>`
5152. Move session coordination logic into `SimulatedExecutionContext`
5163. Integrate `wait_until()` with unified task scheduling
5174. Use TestScheduler features for session validation
5185. Preserve all existing Cloud platform APIs
519
520### Phase 4: Testing & Validation
5211. GPUI tests work with new architecture
5222. Cloud session behavior preserved
5233. Production efficiency maintained
5244. Both domains benefit from unified test infrastructure
525
526## Platform Backend Files
527
528### GPUI Backends
529- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` should implement `Scheduler` trait
530- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` should implement `Scheduler` trait
531- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` should implement `Scheduler` trait
532- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β `TestScheduler` (moved to shared crate)
533
534### Cloud Backends
535- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` should contain `Scheduler` + session coordination
536- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using Scheduler
537
538## Compatibility Checklist
539
540## Complete GPUI + Cloud Feature Coverage β
541
542### GPUI Compatibility
543- β
`spawn()` β `dyn Scheduler::spawn()` (generic helper on trait object)
544- β
`spawn_labeled(label)` β `dyn Scheduler::spawn_labeled()` (generic helper on trait object)
545- β
`timer(duration)` β `dyn Scheduler::timer()` (generic helper using schedule_after)
546- β
`block(future)` β `dyn Scheduler::block()` (generic helper with parking)
547- β
`block_with_timeout(future, timeout)` β `dyn Scheduler::block_with_timeout()` (generic helper)
548- β
`now()` β `scheduler.now()` (direct trait object method)
549- β
`is_main_thread()` β `scheduler.is_main_thread()` (direct trait object method)
550- β
`num_cpus()` β `dyn Scheduler::num_cpus()` (generic helper)
551- β
`deprioritize(label)` β Downcast to TestScheduler, then TestScheduler::deprioritize()
552- β
`tick()` β Downcast to TestScheduler, then TestScheduler::tick()
553- β
`run_until_parked()` β Downcast to TestScheduler, then TestScheduler::run_until_parked()
554- β
`advance_clock(duration)` β Downcast to TestScheduler, then TestScheduler::advance_clock()
555- β
`simulate_random_delay()` β Downcast to TestScheduler, then TestScheduler::simulate_random_delay()
556- β
`BackgroundExecutor` β Trait object wrapper using `dyn Scheduler`
557
558### Cloud Compatibility
559- β
**Session Coordination**: `ExecutionContext.wait_until()` with direct session integration
560- β
**Task Lifecycle**: Uses unified scheduler's `is_task_running()` for validation
561- β
**Worker Management**: Session context and cleanup validation
562- β
**Background Tasks**: Explicit permission system for long-running work
563- β
**Deterministic Testing**: Full TestScheduler integration with session tracking
564- β
**Platform Runtime**: `PlatformRuntime.delay()` via unified scheduler timer
565- β
**Session Validation**: Dangling task detection with proper error reporting
566- β
**Auto-Association**: Tasks automatically linked to sessions during spawn
567
568### Unified Benefits
569- β
**Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
570- β
**Unified Task Tracking**: Both domains use `TestScheduler.is_task_running()` for validation
571- β
**Composability**: Session coordination built on unified scheduling primitives
572- β
**Domain-Specific**: Each domain handles its coordination concerns appropriately
573- β
**Test Infrastructure**: Shared deterministic testing capabilities
574- β
**Production Ready**: Both domains can use minimal platform schedulers
575- β
**Extensible**: New coordination patterns can be added without shared crate changes
576
577## Implementation Notes
578
579### Key Design Decisions
580
5811. **GPUI**: Uses task labels for deterministic UI testing
5822. **Cloud**: Uses session coordination for worker lifecycle management
5833. **Shared**: Core scheduling primitives + TestScheduler for task tracking
5844. **Integration**: Both domains use composition with unified scheduler
585
586### Migration Considerations
587
588- **Zero Breaking Changes**: Existing APIs preserved via generic helpers
589- **Gradual Migration**: Can migrate GPUI and Cloud independently
590- **Test Preservation**: All existing test functionality maintained
591- **Performance**: Minimal overhead from trait objects in production
592
593This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure.