2025-08-31_15-47-17_unified-scheduler-architecture.md

  1β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  2β”‚            Shared Crate                 β”‚
  3β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
  4β”‚ Scheduler trait:                        β”‚
  5β”‚ - Core object-safe interface             β”‚
  6β”‚                                         β”‚
  7β”‚ TestScheduler:                          β”‚
  8β”‚ - Should implement Scheduler + test features  β”‚
  9β”‚ - deprioritize() - test-only method     β”‚
 10β”‚ - spawn_labeled() - labels for testing  β”‚
 11β”‚ - Task lifecycle tracking               β”‚
 12β”‚ - creation_thread_id for Foreground checksβ”‚
 13β”‚                                         β”‚
 14β”‚ Executor wrappers:                      β”‚
 15β”‚ - Executor: Wraps Arc<dyn Scheduler>, Send futuresβ”‚
 16β”‚ - ForegroundExecutor: Wraps Arc<dyn Scheduler>, !Send, thread checksβ”‚
 17β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 18                    β–²
 19          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 20          β”‚         β”‚         β”‚
 21β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”  β”Œβ”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
 22β”‚   GPUI       β”‚  β”‚     Cloud       β”‚
 23β”‚ Uses Executorβ”‚  β”‚ ForegroundExec β”‚
 24β”‚ + Foreground β”‚  β”‚ for single-thrdβ”‚
 25β”‚ + TestSchedulerβ”‚  β”‚ + TestSchedulerβ”‚
 26β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 27```
 28
 29## GPUI Integration
 30
 31### Platform Scheduler Implementations in GPUI
 32
 33Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
 34
 35- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
 36- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
 37- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
 38
 39**Rationale**: Platform implementations are substantial and deeply integrated with:
 40- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
 41- GPUI's event loop integration (main thread messaging)
 42- Platform-specific performance optimizations
 43
 44The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
 45
 46### BackgroundExecutor Integration
 47
 48GPUI's executors now use wrappers:
 49
 50```rust
 51// crates/gpui/src/executor.rs
 52pub struct BackgroundExecutor {
 53    executor: Executor,  // Generic wrapper for background tasks (Send futures)
 54}
 55
 56impl BackgroundExecutor {
 57    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
 58        Self { executor: Executor::new(scheduler) }
 59    }
 60
 61    // Core spawning methods via wrapper delegation
 62    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
 63    where R: Send + 'static {
 64        self.executor.spawn(future)
 65    }
 66
 67    pub fn spawn_labeled<R>(
 68        &self,
 69        label: TaskLabel,
 70        future: impl Future<Output = R> + Send + 'static
 71    ) -> Task<R>
 72    where R: Send + 'static {
 73        self.executor.spawn_labeled(label, future)
 74    }
 75
 76    // Timer functionality via wrapper
 77    pub fn timer(&self, duration: Duration) -> Task<()> {
 78        self.executor.timer(duration)
 79    }
 80
 81    // Test-specific methods via downcast in wrapper
 82    pub fn deprioritize(&self, label: TaskLabel) {
 83        self.executor.deprioritize(label);
 84    }
 85
 86    pub fn tick(&self) -> Option<bool> {
 87        self.executor.tick()
 88    }
 89}
 90```
 91
 92### ForegroundExecutor Integration
 93
 94GPUI's foreground executor enforces main-thread usage:
 95
 96```rust
 97// crates/gpui/src/executor.rs
 98pub struct ForegroundExecutor {
 99    executor: Executor,  // Underlying executor for delegation
100    _phantom: PhantomData<Rc<()>>,  // Enforces !Send
101    creation_thread_id: ThreadId,  // Stored for checks
102}
103
104impl !Send for ForegroundExecutor {}  // Explicitly !Send
105
106impl ForegroundExecutor {
107    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
108        let creation_thread_id = thread::current().id();
109        // Delegate creation to underlying scheduler
110        let _ = Executor::new(scheduler.clone());
111        Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
112    }
113
114    // Core spawning for main thread (non-Send futures)
115    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
116    where R: 'static {
117        if thread::current().id() != self.creation_thread_id {
118            panic!("ForegroundExecutor called off main thread");
119        }
120        // Delegate to scheduler.spawn_foreground via wrapper
121        self.executor.scheduler.spawn_foreground(future)
122    }
123
124    // Timer and test methods same as BackgroundExecutor but with thread checks
125    pub fn timer(&self, duration: Duration) -> Task<()> {
126        if thread::current().id() != self.creation_thread_id {
127            panic!("ForegroundExecutor called off main thread");
128        }
129        self.executor.timer(duration)
130    }
131}
132```
133
134## Cloud Integration
135
136### Session Coordination in SimulatedExecutionContext
137
138Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.**
139
140```rust
141// crates/platform_simulator/src/platform.rs
142pub struct SimulatedExecutionContext {
143    fg_executor: ForegroundExecutor,  // Single-threaded wrapper for simplicity
144    session_counter: AtomicUsize,
145    sessions: Mutex<HashMap<SessionId, WorkerSession>>,
146    current_session: Mutex<Option<SessionId>>,
147}
148
149#[async_trait(?Send)]
150impl PlatformRuntime for SimulatedExecutionContext {
151    async fn delay(&self, duration: Duration) {
152        // Use wrapper's timer for delay
153        self.fg_executor.timer(duration).await;
154    }
155}
156```
157
158### Wait Until Implementation
159
160Session coordination integrated directly with wrapper's task scheduling:
161
162```rust
163#[async_trait(?Send)]
164impl ExecutionContext for SimulatedExecutionContext {
165    fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
166        // 1. Spawn using wrapper (no Send required)
167        let task = self.fg_executor.spawn(async move { future.await })?;
168        
169        // 2. Register with session coordination via downcast
170        if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
171            if let Some(session_id) = test_sched.get_current_session() {
172                test_sched.track_task_for_session(task.id(), session_id);
173                test_sched.add_wait_until_task(session_id, task.id());
174            }
175        }
176
177        Ok(())
178    }
179
180    async fn pass_through(&self) -> Result<()> {
181        // Use wrapper's timer for delay
182        self.fg_executor.timer(Duration::from_millis(10)).await;
183        Ok(())
184    }
185}
186```
187
188### Session Coordination Methods
189
190Core session operations handled within SimulatedExecutionContext:
191
192```rust
193impl SimulatedExecutionContext {
194    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
195        let fg_executor = ForegroundExecutor::new(scheduler)?;
196        Ok(Self {
197            fg_executor,
198            session_counter: AtomicUsize::new(0),
199            sessions: Mutex::new(HashMap::new()),
200            current_session: Mutex::new(None),
201        })
202    }
203
204    pub fn create_session(&self) -> SessionId {
205        let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
206        let session_id = SessionId(session_counter);
207
208        self.sessions.lock().insert(session_id, WorkerSession {
209            spawned_tasks: HashSet::new(),
210            wait_until_task_ids: HashSet::new(),
211        });
212
213        session_id
214    }
215
216    pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
217    where
218        F: FnOnce() -> R,
219    {
220        {
221            let mut current = self.current_session.lock();
222            *current = Some(session_id);
223        }
224
225        let result = f();
226
227        {
228            let mut current = self.current_session.lock();
229            *current = None;
230        }
231
232        result
233    }
234
235    pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
236        let sessions = self.sessions.lock();
237        if let Some(session) = sessions.get(&session_id) {
238            // Check running tasks using wrapper's TestScheduler access
239            if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
240                let dangling_tasks: Vec<TaskId> = session
241                    .spawned_tasks
242                    .iter()
243                    .filter(|&&task_id| test_sched.is_task_running(task_id))
244                    .copied()
245                    .collect();
246
247                // Cloud-specific permission check
248                let unauthorized: Vec<_> = dangling_tasks
249                    .into_iter()
250                    .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
251                    .collect();
252
253                if !unauthorized.is_empty() {
254                    return Err(platform_api::WorkerError::Other(anyhow!(
255                        "Session cleanup failed: {} unauthorized tasks still running",
256                        unauthorized.len()
257                    )));
258                }
259            }
260        }
261        Ok(())
262    }
263
264    pub fn set_current_session(&self, session_id: SessionId) {
265        *self.current_session.lock() = Some(session_id);
266    }
267
268    pub fn get_current_session(&self) -> Option<SessionId> {
269        *self.current_session.lock()
270    }
271}
272```
273
274### Cloud-Specific Data Structures
275
276Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
277
278## Scheduler Trait Definition
279
280```rust
281pub trait Scheduler: Send + Sync {
282    /// Schedule a runnable to be executed (object-safe core functionality)
283    fn schedule(&self, runnable: Runnable);
284
285    /// Schedule a runnable with label for test tracking
286    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
287
288    /// Schedule a runnable on the main thread (optional, defaults to panic)
289    fn schedule_foreground(&self, runnable: Runnable) {
290        panic!("schedule_foreground not supported by this scheduler");
291    }
292
293    /// Schedule a runnable after a delay (object-safe for timers)
294    fn schedule_after(&self, duration: Duration, runnable: Runnable);
295
296    /// Platform integration methods
297    fn park(&self, timeout: Option<Duration>) -> bool;
298    fn unparker(&self) -> Unparker;
299    fn is_main_thread(&self) -> bool;
300    fn now(&self) -> Instant;
301}
302
303
304
305## TestScheduler (Concrete Implementation)
306
307```rust
308pub struct TestScheduler {
309    inner: RefCell<TestSchedulerInner>,
310}
311
312struct TestSchedulerInner {
313    tasks: HashMap<TaskId, TaskState>,
314    task_labels: HashMap<TaskId, TaskLabel>,
315    deprioritized_labels: HashSet<TaskLabel>,
316    deprioritized_queue: VecDeque<(Runnable, TaskId)>,
317    main_thread_queue: VecDeque<Runnable>,
318    delayed: Vec<(Instant, Runnable)>,
319    parker: Parker,
320    is_main_thread: bool,
321    now: Instant,
322    next_task_id: AtomicUsize,
323    rng: StdRng,
324    waiting_tasks: HashSet<TaskId>,
325    parking_allowed: bool,
326    waiting_hint: Option<String>,
327    block_tick_range: std::ops::RangeInclusive<usize>,
328    creation_thread_id: ThreadId,  // Added for wrapper checks
329}
330
331impl Scheduler for TestScheduler {
332    fn schedule(&self, runnable: Runnable) {
333        // Implementation as before
334    }
335
336    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
337        // Implementation as before, with deprioritization
338    }
339
340    fn schedule_foreground(&self, runnable: Runnable) {
341        assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
342        // Implementation as before
343    }
344
345    // Other trait methods unchanged
346}
347
348impl TestScheduler {
349    // Test-specific methods (NOT on main trait)
350    pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
351    pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
352    pub fn tick(&self) -> bool { /* implementation */ }
353    pub fn run_until_parked(&self) { /* implementation */ }
354    pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
355    pub fn assert_main_thread(&self) { /* implementation */ }
356}
357```
358
359## Generic Spawn Helpers
360
361Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers.
362
363## Migration Strategy
364
365### Phase 1: Core Infrastructure
3661. Define `Scheduler` trait (core methods only)
3672. Implement `TestScheduler` with thread ID tracking
3683. Add wrapper structs `Executor` and `ForegroundExecutor`
3694. Make existing GPUI platform dispatchers implement `Scheduler` trait
3705. Add `as_any()` to `Scheduler` for downcasting
371
372### Phase 2: GPUI Migration
3731. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers
3742. Handle downcasting in wrappers for test features
3753. Preserve all existing GPUI functionality
3764. Test deployments use TestScheduler, production uses minimal schedulers
377
378### Phase 3: Cloud Integration
3791. Update `SimulatedExecutionContext` to use `ForegroundExecutor`
3802. Move session coordination logic into `SimulatedExecutionContext`
3813. Integrate `wait_until()` with wrapper scheduling
3824. Use TestScheduler features for session validation via downcast
3835. Preserve all existing Cloud platform APIs
384
385### Phase 4: Testing & Validation
3861. GPUI tests work with new architecture
3872. Cloud session behavior preserved (single-threaded)
3883. Production efficiency maintained
3894. Both domains benefit from unified test infrastructure
390
391## Platform Backend Files
392
393### GPUI Backends
394- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler`
395- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler`
396- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler`
397- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β†’ `TestScheduler` (moved to shared crate)
398
399### Cloud Backends
400- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor`
401- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor`
402
403## Compatibility Checklist
404
405## Complete GPUI + Cloud Feature Coverage βœ…
406
407### GPUI Compatibility
408- βœ… `spawn()` β†’ `Executor::spawn()` or `ForegroundExecutor::spawn()`
409- βœ… `spawn_labeled(label)` β†’ Wrappers delegate to `dyn Scheduler::spawn_labeled()`
410- βœ… `timer(duration)` β†’ Wrappers delegate to `dyn Scheduler::timer()`
411- βœ… `block(future)` β†’ Wrappers handle with parking
412- βœ… `block_with_timeout(future, timeout)` β†’ Wrappers handle
413- βœ… `now()` β†’ `scheduler.now()` (direct trait method)
414- βœ… `is_main_thread()` β†’ `scheduler.is_main_thread()` (direct trait method)
415- βœ… `num_cpus()` β†’ Generic helper on wrappers
416- βœ… `deprioritize(label)` β†’ Downcast in wrappers, then TestScheduler::deprioritize()
417- βœ… `tick()` β†’ Downcast in wrappers, then TestScheduler::tick()
418- βœ… `run_until_parked()` β†’ Downcast in wrappers, then TestScheduler::run_until_parked()
419- βœ… `advance_clock(duration)` β†’ Downcast in wrappers, then TestScheduler::advance_clock()
420- βœ… `simulate_random_delay()` β†’ Downcast in wrappers, then TestScheduler::simulate_random_delay()
421- βœ… `BackgroundExecutor` β†’ Uses `Executor` wrapper
422
423### Cloud Compatibility
424- βœ… **Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor`
425- βœ… **Task Lifecycle**: Uses wrapper's TestScheduler access for validation
426- βœ… **Worker Management**: Session context and cleanup validation
427- βœ… **Background Tasks**: Explicit permission system for long-running work
428- βœ… **Deterministic Testing**: Full TestScheduler integration with session tracking
429- βœ… **Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer
430- βœ… **Session Validation**: Dangling task detection with proper error reporting
431- βœ… **Auto-Association**: Tasks automatically linked to sessions during spawn
432
433### Unified Benefits
434- βœ… **Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
435- βœ… **Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation
436- βœ… **Composability**: Session coordination built on unified scheduling primitives
437- βœ… **Domain-Specific**: Each domain handles its coordination concerns appropriately
438- βœ… **Test Infrastructure**: Shared deterministic testing capabilities
439- βœ… **Production Ready**: Both domains can use minimal platform schedulers
440- βœ… **Extensible**: New coordination patterns can be added without shared crate changes
441- βœ… **Thread Safety**: ForegroundExecutor enforces main-thread use across domains
442
443## Implementation Notes
444
445### Key Design Decisions
446
4471. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send)
4482. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures)
4493. **Shared**: Core scheduling primitives + wrappers for delegation and safety
4504. **Integration**: Both domains use wrappers with consistent API
451
452### Migration Considerations
453
454- **Zero Breaking Changes**: Existing APIs preserved via wrappers
455- **Gradual Migration**: Can migrate GPUI and Cloud independently
456- **Test Preservation**: All existing test functionality maintained
457- **Performance**: Minimal overhead from trait objects in production
458- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context
459
460This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.