1βββββββββββββββββββββββββββββββββββββββββββ
2β Shared Crate β
3βββββββββββββββββββββββββββββββββββββββββββ€
4β Scheduler trait: β
5β - Core object-safe interface β
6β β
7β TestScheduler: β
8β - Should implement Scheduler + test features β
9β - deprioritize() - test-only method β
10β - spawn_labeled() - labels for testing β
11β - Task lifecycle tracking β
12β - creation_thread_id for Foreground checksβ
13β β
14β Executor wrappers: β
15β - Executor: Wraps Arc<dyn Scheduler>, Send futuresβ
16β - ForegroundExecutor: Wraps Arc<dyn Scheduler>, !Send, thread checksβ
17βββββββββββββββββββββββββββββββββββββββββββ
18 β²
19 βββββββββββΌββββββββββ
20 β β β
21βββββββββββ΄βββββ βββ΄ββββββββββ΄βββββ
22β GPUI β β Cloud β
23β Uses Executorβ β ForegroundExec β
24β + Foreground β β for single-thrdβ
25β + TestSchedulerβ β + TestSchedulerβ
26ββββββββββββββββ βββββββββββββββββββ
27```
28
29## GPUI Integration
30
31### Platform Scheduler Implementations in GPUI
32
33Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
34
35- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
36- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
37- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
38
39**Rationale**: Platform implementations are substantial and deeply integrated with:
40- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
41- GPUI's event loop integration (main thread messaging)
42- Platform-specific performance optimizations
43
44The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
45
46### BackgroundExecutor Integration
47
48GPUI's executors now use wrappers:
49
50```rust
51// crates/gpui/src/executor.rs
52pub struct BackgroundExecutor {
53 executor: Executor, // Generic wrapper for background tasks (Send futures)
54}
55
56impl BackgroundExecutor {
57 pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
58 Self { executor: Executor::new(scheduler) }
59 }
60
61 // Core spawning methods via wrapper delegation
62 pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
63 where R: Send + 'static {
64 self.executor.spawn(future)
65 }
66
67 pub fn spawn_labeled<R>(
68 &self,
69 label: TaskLabel,
70 future: impl Future<Output = R> + Send + 'static
71 ) -> Task<R>
72 where R: Send + 'static {
73 self.executor.spawn_labeled(label, future)
74 }
75
76 // Timer functionality via wrapper
77 pub fn timer(&self, duration: Duration) -> Task<()> {
78 self.executor.timer(duration)
79 }
80
81 // Test-specific methods via downcast in wrapper
82 pub fn deprioritize(&self, label: TaskLabel) {
83 self.executor.deprioritize(label);
84 }
85
86 pub fn tick(&self) -> Option<bool> {
87 self.executor.tick()
88 }
89}
90```
91
92### ForegroundExecutor Integration
93
94GPUI's foreground executor enforces main-thread usage:
95
96```rust
97// crates/gpui/src/executor.rs
98pub struct ForegroundExecutor {
99 executor: Executor, // Underlying executor for delegation
100 _phantom: PhantomData<Rc<()>>, // Enforces !Send
101 creation_thread_id: ThreadId, // Stored for checks
102}
103
104impl !Send for ForegroundExecutor {} // Explicitly !Send
105
106impl ForegroundExecutor {
107 pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
108 let creation_thread_id = thread::current().id();
109 // Delegate creation to underlying scheduler
110 let _ = Executor::new(scheduler.clone());
111 Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
112 }
113
114 // Core spawning for main thread (non-Send futures)
115 pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
116 where R: 'static {
117 if thread::current().id() != self.creation_thread_id {
118 panic!("ForegroundExecutor called off main thread");
119 }
120 // Delegate to scheduler.spawn_foreground via wrapper
121 self.executor.scheduler.spawn_foreground(future)
122 }
123
124 // Timer and test methods same as BackgroundExecutor but with thread checks
125 pub fn timer(&self, duration: Duration) -> Task<()> {
126 if thread::current().id() != self.creation_thread_id {
127 panic!("ForegroundExecutor called off main thread");
128 }
129 self.executor.timer(duration)
130 }
131}
132```
133
134## Cloud Integration
135
136### Session Coordination in SimulatedExecutionContext
137
138Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.**
139
140```rust
141// crates/platform_simulator/src/platform.rs
142pub struct SimulatedExecutionContext {
143 fg_executor: ForegroundExecutor, // Single-threaded wrapper for simplicity
144 session_counter: AtomicUsize,
145 sessions: Mutex<HashMap<SessionId, WorkerSession>>,
146 current_session: Mutex<Option<SessionId>>,
147}
148
149#[async_trait(?Send)]
150impl PlatformRuntime for SimulatedExecutionContext {
151 async fn delay(&self, duration: Duration) {
152 // Use wrapper's timer for delay
153 self.fg_executor.timer(duration).await;
154 }
155}
156```
157
158### Wait Until Implementation
159
160Session coordination integrated directly with wrapper's task scheduling:
161
162```rust
163#[async_trait(?Send)]
164impl ExecutionContext for SimulatedExecutionContext {
165 fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
166 // 1. Spawn using wrapper (no Send required)
167 let task = self.fg_executor.spawn(async move { future.await })?;
168
169 // 2. Register with session coordination via downcast
170 if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
171 if let Some(session_id) = test_sched.get_current_session() {
172 test_sched.track_task_for_session(task.id(), session_id);
173 test_sched.add_wait_until_task(session_id, task.id());
174 }
175 }
176
177 Ok(())
178 }
179
180 async fn pass_through(&self) -> Result<()> {
181 // Use wrapper's timer for delay
182 self.fg_executor.timer(Duration::from_millis(10)).await;
183 Ok(())
184 }
185}
186```
187
188### Session Coordination Methods
189
190Core session operations handled within SimulatedExecutionContext:
191
192```rust
193impl SimulatedExecutionContext {
194 pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
195 let fg_executor = ForegroundExecutor::new(scheduler)?;
196 Ok(Self {
197 fg_executor,
198 session_counter: AtomicUsize::new(0),
199 sessions: Mutex::new(HashMap::new()),
200 current_session: Mutex::new(None),
201 })
202 }
203
204 pub fn create_session(&self) -> SessionId {
205 let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
206 let session_id = SessionId(session_counter);
207
208 self.sessions.lock().insert(session_id, WorkerSession {
209 spawned_tasks: HashSet::new(),
210 wait_until_task_ids: HashSet::new(),
211 });
212
213 session_id
214 }
215
216 pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
217 where
218 F: FnOnce() -> R,
219 {
220 {
221 let mut current = self.current_session.lock();
222 *current = Some(session_id);
223 }
224
225 let result = f();
226
227 {
228 let mut current = self.current_session.lock();
229 *current = None;
230 }
231
232 result
233 }
234
235 pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
236 let sessions = self.sessions.lock();
237 if let Some(session) = sessions.get(&session_id) {
238 // Check running tasks using wrapper's TestScheduler access
239 if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
240 let dangling_tasks: Vec<TaskId> = session
241 .spawned_tasks
242 .iter()
243 .filter(|&&task_id| test_sched.is_task_running(task_id))
244 .copied()
245 .collect();
246
247 // Cloud-specific permission check
248 let unauthorized: Vec<_> = dangling_tasks
249 .into_iter()
250 .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
251 .collect();
252
253 if !unauthorized.is_empty() {
254 return Err(platform_api::WorkerError::Other(anyhow!(
255 "Session cleanup failed: {} unauthorized tasks still running",
256 unauthorized.len()
257 )));
258 }
259 }
260 }
261 Ok(())
262 }
263
264 pub fn set_current_session(&self, session_id: SessionId) {
265 *self.current_session.lock() = Some(session_id);
266 }
267
268 pub fn get_current_session(&self) -> Option<SessionId> {
269 *self.current_session.lock()
270 }
271}
272```
273
274### Cloud-Specific Data Structures
275
276Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
277
278## Scheduler Trait Definition
279
280```rust
281pub trait Scheduler: Send + Sync {
282 /// Schedule a runnable to be executed (object-safe core functionality)
283 fn schedule(&self, runnable: Runnable);
284
285 /// Schedule a runnable with label for test tracking
286 fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
287
288 /// Schedule a runnable on the main thread (optional, defaults to panic)
289 fn schedule_foreground(&self, runnable: Runnable) {
290 panic!("schedule_foreground not supported by this scheduler");
291 }
292
293 /// Schedule a runnable after a delay (object-safe for timers)
294 fn schedule_after(&self, duration: Duration, runnable: Runnable);
295
296 /// Platform integration methods
297 fn park(&self, timeout: Option<Duration>) -> bool;
298 fn unparker(&self) -> Unparker;
299 fn is_main_thread(&self) -> bool;
300 fn now(&self) -> Instant;
301}
302
303
304
305## TestScheduler (Concrete Implementation)
306
307```rust
308pub struct TestScheduler {
309 inner: RefCell<TestSchedulerInner>,
310}
311
312struct TestSchedulerInner {
313 tasks: HashMap<TaskId, TaskState>,
314 task_labels: HashMap<TaskId, TaskLabel>,
315 deprioritized_labels: HashSet<TaskLabel>,
316 deprioritized_queue: VecDeque<(Runnable, TaskId)>,
317 main_thread_queue: VecDeque<Runnable>,
318 delayed: Vec<(Instant, Runnable)>,
319 parker: Parker,
320 is_main_thread: bool,
321 now: Instant,
322 next_task_id: AtomicUsize,
323 rng: StdRng,
324 waiting_tasks: HashSet<TaskId>,
325 parking_allowed: bool,
326 waiting_hint: Option<String>,
327 block_tick_range: std::ops::RangeInclusive<usize>,
328 creation_thread_id: ThreadId, // Added for wrapper checks
329}
330
331impl Scheduler for TestScheduler {
332 fn schedule(&self, runnable: Runnable) {
333 // Implementation as before
334 }
335
336 fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
337 // Implementation as before, with deprioritization
338 }
339
340 fn schedule_foreground(&self, runnable: Runnable) {
341 assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
342 // Implementation as before
343 }
344
345 // Other trait methods unchanged
346}
347
348impl TestScheduler {
349 // Test-specific methods (NOT on main trait)
350 pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
351 pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
352 pub fn tick(&self) -> bool { /* implementation */ }
353 pub fn run_until_parked(&self) { /* implementation */ }
354 pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
355 pub fn assert_main_thread(&self) { /* implementation */ }
356}
357```
358
359## Generic Spawn Helpers
360
361Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers.
362
363## Migration Strategy
364
365### Phase 1: Core Infrastructure
3661. Define `Scheduler` trait (core methods only)
3672. Implement `TestScheduler` with thread ID tracking
3683. Add wrapper structs `Executor` and `ForegroundExecutor`
3694. Make existing GPUI platform dispatchers implement `Scheduler` trait
3705. Add `as_any()` to `Scheduler` for downcasting
371
372### Phase 2: GPUI Migration
3731. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers
3742. Handle downcasting in wrappers for test features
3753. Preserve all existing GPUI functionality
3764. Test deployments use TestScheduler, production uses minimal schedulers
377
378### Phase 3: Cloud Integration
3791. Update `SimulatedExecutionContext` to use `ForegroundExecutor`
3802. Move session coordination logic into `SimulatedExecutionContext`
3813. Integrate `wait_until()` with wrapper scheduling
3824. Use TestScheduler features for session validation via downcast
3835. Preserve all existing Cloud platform APIs
384
385### Phase 4: Testing & Validation
3861. GPUI tests work with new architecture
3872. Cloud session behavior preserved (single-threaded)
3883. Production efficiency maintained
3894. Both domains benefit from unified test infrastructure
390
391## Platform Backend Files
392
393### GPUI Backends
394- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler`
395- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler`
396- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler`
397- `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β `TestScheduler` (moved to shared crate)
398
399### Cloud Backends
400- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor`
401- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor`
402
403## Compatibility Checklist
404
405## Complete GPUI + Cloud Feature Coverage β
406
407### GPUI Compatibility
408- β
`spawn()` β `Executor::spawn()` or `ForegroundExecutor::spawn()`
409- β
`spawn_labeled(label)` β Wrappers delegate to `dyn Scheduler::spawn_labeled()`
410- β
`timer(duration)` β Wrappers delegate to `dyn Scheduler::timer()`
411- β
`block(future)` β Wrappers handle with parking
412- β
`block_with_timeout(future, timeout)` β Wrappers handle
413- β
`now()` β `scheduler.now()` (direct trait method)
414- β
`is_main_thread()` β `scheduler.is_main_thread()` (direct trait method)
415- β
`num_cpus()` β Generic helper on wrappers
416- β
`deprioritize(label)` β Downcast in wrappers, then TestScheduler::deprioritize()
417- β
`tick()` β Downcast in wrappers, then TestScheduler::tick()
418- β
`run_until_parked()` β Downcast in wrappers, then TestScheduler::run_until_parked()
419- β
`advance_clock(duration)` β Downcast in wrappers, then TestScheduler::advance_clock()
420- β
`simulate_random_delay()` β Downcast in wrappers, then TestScheduler::simulate_random_delay()
421- β
`BackgroundExecutor` β Uses `Executor` wrapper
422
423### Cloud Compatibility
424- β
**Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor`
425- β
**Task Lifecycle**: Uses wrapper's TestScheduler access for validation
426- β
**Worker Management**: Session context and cleanup validation
427- β
**Background Tasks**: Explicit permission system for long-running work
428- β
**Deterministic Testing**: Full TestScheduler integration with session tracking
429- β
**Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer
430- β
**Session Validation**: Dangling task detection with proper error reporting
431- β
**Auto-Association**: Tasks automatically linked to sessions during spawn
432
433### Unified Benefits
434- β
**Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
435- β
**Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation
436- β
**Composability**: Session coordination built on unified scheduling primitives
437- β
**Domain-Specific**: Each domain handles its coordination concerns appropriately
438- β
**Test Infrastructure**: Shared deterministic testing capabilities
439- β
**Production Ready**: Both domains can use minimal platform schedulers
440- β
**Extensible**: New coordination patterns can be added without shared crate changes
441- β
**Thread Safety**: ForegroundExecutor enforces main-thread use across domains
442
443## Implementation Notes
444
445### Key Design Decisions
446
4471. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send)
4482. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures)
4493. **Shared**: Core scheduling primitives + wrappers for delegation and safety
4504. **Integration**: Both domains use wrappers with consistent API
451
452### Migration Considerations
453
454- **Zero Breaking Changes**: Existing APIs preserved via wrappers
455- **Gradual Migration**: Can migrate GPUI and Cloud independently
456- **Test Preservation**: All existing test functionality maintained
457- **Performance**: Minimal overhead from trait objects in production
458- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context
459
460This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.