βββββββββββββββββββββββββββββββββββββββββββ β Shared Crate β βββββββββββββββββββββββββββββββββββββββββββ€ β Scheduler trait: β β - Core object-safe interface β β β β TestScheduler: β β - Should implement Scheduler + test features β β - deprioritize() - test-only method β β - spawn_labeled() - labels for testing β β - Task lifecycle tracking β β - creation_thread_id for Foreground checksβ β β β Executor wrappers: β β - Executor: Wraps Arc, Send futuresβ β - ForegroundExecutor: Wraps Arc, !Send, thread checksβ βββββββββββββββββββββββββββββββββββββββββββ β² βββββββββββΌββββββββββ β β β βββββββββββ΄βββββ βββ΄ββββββββββ΄βββββ β GPUI β β Cloud β β Uses Executorβ β ForegroundExec β β + Foreground β β for single-thrdβ β + TestSchedulerβ β + TestSchedulerβ ββββββββββββββββ βββββββββββββββββββ
## GPUI Integration
### Platform Scheduler Implementations in GPUI
Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
**Rationale**: Platform implementations are substantial and deeply integrated with:
- Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
- GPUI's event loop integration (main thread messaging)
- Platform-specific performance optimizations
The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
### BackgroundExecutor Integration
GPUI's executors now use wrappers:
```rust
// crates/gpui/src/executor.rs
pub struct BackgroundExecutor {
executor: Executor, // Generic wrapper for background tasks (Send futures)
}
impl BackgroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { executor: Executor::new(scheduler) }
}
// Core spawning methods via wrapper delegation
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
where R: Send + 'static {
self.executor.spawn(future)
}
pub fn spawn_labeled<R>(
&self,
label: TaskLabel,
future: impl Future<Output = R> + Send + 'static
) -> Task<R>
where R: Send + 'static {
self.executor.spawn_labeled(label, future)
}
// Timer functionality via wrapper
pub fn timer(&self, duration: Duration) -> Task<()> {
self.executor.timer(duration)
}
// Test-specific methods via downcast in wrapper
pub fn deprioritize(&self, label: TaskLabel) {
self.executor.deprioritize(label);
}
pub fn tick(&self) -> Option<bool> {
self.executor.tick()
}
}
ForegroundExecutor Integration
GPUI's foreground executor enforces main-thread usage:
// crates/gpui/src/executor.rs
pub struct ForegroundExecutor {
executor: Executor, // Underlying executor for delegation
_phantom: PhantomData<Rc<()>>, // Enforces !Send
creation_thread_id: ThreadId, // Stored for checks
}
impl !Send for ForegroundExecutor {} // Explicitly !Send
impl ForegroundExecutor {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let creation_thread_id = thread::current().id();
// Delegate creation to underlying scheduler
let _ = Executor::new(scheduler.clone());
Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
}
// Core spawning for main thread (non-Send futures)
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
where R: 'static {
if thread::current().id() != self.creation_thread_id {
panic!("ForegroundExecutor called off main thread");
}
// Delegate to scheduler.spawn_foreground via wrapper
self.executor.scheduler.spawn_foreground(future)
}
// Timer and test methods same as BackgroundExecutor but with thread checks
pub fn timer(&self, duration: Duration) -> Task<()> {
if thread::current().id() != self.creation_thread_id {
panic!("ForegroundExecutor called off main thread");
}
self.executor.timer(duration)
}
}
Cloud Integration
Session Coordination in SimulatedExecutionContext
Cloud's session coordination logic should be handled directly within SimulatedExecutionContext, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.
// crates/platform_simulator/src/platform.rs
pub struct SimulatedExecutionContext {
fg_executor: ForegroundExecutor, // Single-threaded wrapper for simplicity
session_counter: AtomicUsize,
sessions: Mutex<HashMap<SessionId, WorkerSession>>,
current_session: Mutex<Option<SessionId>>,
}
#[async_trait(?Send)]
impl PlatformRuntime for SimulatedExecutionContext {
async fn delay(&self, duration: Duration) {
// Use wrapper's timer for delay
self.fg_executor.timer(duration).await;
}
}
Wait Until Implementation
Session coordination integrated directly with wrapper's task scheduling:
#[async_trait(?Send)]
impl ExecutionContext for SimulatedExecutionContext {
fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
// 1. Spawn using wrapper (no Send required)
let task = self.fg_executor.spawn(async move { future.await })?;
// 2. Register with session coordination via downcast
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
if let Some(session_id) = test_sched.get_current_session() {
test_sched.track_task_for_session(task.id(), session_id);
test_sched.add_wait_until_task(session_id, task.id());
}
}
Ok(())
}
async fn pass_through(&self) -> Result<()> {
// Use wrapper's timer for delay
self.fg_executor.timer(Duration::from_millis(10)).await;
Ok(())
}
}
Session Coordination Methods
Core session operations handled within SimulatedExecutionContext:
impl SimulatedExecutionContext {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
let fg_executor = ForegroundExecutor::new(scheduler)?;
Ok(Self {
fg_executor,
session_counter: AtomicUsize::new(0),
sessions: Mutex::new(HashMap::new()),
current_session: Mutex::new(None),
})
}
pub fn create_session(&self) -> SessionId {
let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
let session_id = SessionId(session_counter);
self.sessions.lock().insert(session_id, WorkerSession {
spawned_tasks: HashSet::new(),
wait_until_task_ids: HashSet::new(),
});
session_id
}
pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
where
F: FnOnce() -> R,
{
{
let mut current = self.current_session.lock();
*current = Some(session_id);
}
let result = f();
{
let mut current = self.current_session.lock();
*current = None;
}
result
}
pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
let sessions = self.sessions.lock();
if let Some(session) = sessions.get(&session_id) {
// Check running tasks using wrapper's TestScheduler access
if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
let dangling_tasks: Vec<TaskId> = session
.spawned_tasks
.iter()
.filter(|&&task_id| test_sched.is_task_running(task_id))
.copied()
.collect();
// Cloud-specific permission check
let unauthorized: Vec<_> = dangling_tasks
.into_iter()
.filter(|task_id| !session.wait_until_task_ids.contains(task_id))
.collect();
if !unauthorized.is_empty() {
return Err(platform_api::WorkerError::Other(anyhow!(
"Session cleanup failed: {} unauthorized tasks still running",
unauthorized.len()
)));
}
}
}
Ok(())
}
pub fn set_current_session(&self, session_id: SessionId) {
*self.current_session.lock() = Some(session_id);
}
pub fn get_current_session(&self) -> Option<SessionId> {
*self.current_session.lock()
}
}
Cloud-Specific Data Structures
Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
Scheduler Trait Definition
pub trait Scheduler: Send + Sync {
/// Schedule a runnable to be executed (object-safe core functionality)
fn schedule(&self, runnable: Runnable);
/// Schedule a runnable with label for test tracking
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
/// Schedule a runnable on the main thread (optional, defaults to panic)
fn schedule_foreground(&self, runnable: Runnable) {
panic!("schedule_foreground not supported by this scheduler");
}
/// Schedule a runnable after a delay (object-safe for timers)
fn schedule_after(&self, duration: Duration, runnable: Runnable);
/// Platform integration methods
fn park(&self, timeout: Option<Duration>) -> bool;
fn unparker(&self) -> Unparker;
fn is_main_thread(&self) -> bool;
fn now(&self) -> Instant;
}
## TestScheduler (Concrete Implementation)
```rust
pub struct TestScheduler {
inner: RefCell<TestSchedulerInner>,
}
struct TestSchedulerInner {
tasks: HashMap<TaskId, TaskState>,
task_labels: HashMap<TaskId, TaskLabel>,
deprioritized_labels: HashSet<TaskLabel>,
deprioritized_queue: VecDeque<(Runnable, TaskId)>,
main_thread_queue: VecDeque<Runnable>,
delayed: Vec<(Instant, Runnable)>,
parker: Parker,
is_main_thread: bool,
now: Instant,
next_task_id: AtomicUsize,
rng: StdRng,
waiting_tasks: HashSet<TaskId>,
parking_allowed: bool,
waiting_hint: Option<String>,
block_tick_range: std::ops::RangeInclusive<usize>,
creation_thread_id: ThreadId, // Added for wrapper checks
}
impl Scheduler for TestScheduler {
fn schedule(&self, runnable: Runnable) {
// Implementation as before
}
fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
// Implementation as before, with deprioritization
}
fn schedule_foreground(&self, runnable: Runnable) {
assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
// Implementation as before
}
// Other trait methods unchanged
}
impl TestScheduler {
// Test-specific methods (NOT on main trait)
pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
pub fn tick(&self) -> bool { /* implementation */ }
pub fn run_until_parked(&self) { /* implementation */ }
pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
pub fn assert_main_thread(&self) { /* implementation */ }
}
Generic Spawn Helpers
Generic spawn methods implemented for dyn Scheduler, now called by wrappers.
Migration Strategy
Phase 1: Core Infrastructure
- Define
Schedulertrait (core methods only) - Implement
TestSchedulerwith thread ID tracking - Add wrapper structs
ExecutorandForegroundExecutor - Make existing GPUI platform dispatchers implement
Schedulertrait - Add
as_any()toSchedulerfor downcasting
Phase 2: GPUI Migration
- Update GPUI executors to use
ExecutorandForegroundExecutorwrappers - Handle downcasting in wrappers for test features
- Preserve all existing GPUI functionality
- Test deployments use TestScheduler, production uses minimal schedulers
Phase 3: Cloud Integration
- Update
SimulatedExecutionContextto useForegroundExecutor - Move session coordination logic into
SimulatedExecutionContext - Integrate
wait_until()with wrapper scheduling - Use TestScheduler features for session validation via downcast
- Preserve all existing Cloud platform APIs
Phase 4: Testing & Validation
- GPUI tests work with new architecture
- Cloud session behavior preserved (single-threaded)
- Production efficiency maintained
- Both domains benefit from unified test infrastructure
Platform Backend Files
GPUI Backends
crates/gpui/src/platform/mac/dispatcher.rs-MacDispatcherimplementsSchedulercrates/gpui/src/platform/linux/dispatcher.rs-LinuxDispatcherimplementsSchedulercrates/gpui/src/platform/windows/dispatcher.rs-WindowsDispatcherimplementsSchedulercrates/gpui/src/platform/test/dispatcher.rs-TestDispatcherβTestScheduler(moved to shared crate)
Cloud Backends
crates/platform_simulator/src/platform.rs-SimulatedExecutionContextusesForegroundExecutorcrates/cloudflare_platform/src/execution_context.rs- Cloudflare-specific ExecutionContext usingForegroundExecutor
Compatibility Checklist
Complete GPUI + Cloud Feature Coverage β
GPUI Compatibility
- β
spawn()βExecutor::spawn()orForegroundExecutor::spawn() - β
spawn_labeled(label)β Wrappers delegate todyn Scheduler::spawn_labeled() - β
timer(duration)β Wrappers delegate todyn Scheduler::timer() - β
block(future)β Wrappers handle with parking - β
block_with_timeout(future, timeout)β Wrappers handle - β
now()βscheduler.now()(direct trait method) - β
is_main_thread()βscheduler.is_main_thread()(direct trait method) - β
num_cpus()β Generic helper on wrappers - β
deprioritize(label)β Downcast in wrappers, then TestScheduler::deprioritize() - β
tick()β Downcast in wrappers, then TestScheduler::tick() - β
run_until_parked()β Downcast in wrappers, then TestScheduler::run_until_parked() - β
advance_clock(duration)β Downcast in wrappers, then TestScheduler::advance_clock() - β
simulate_random_delay()β Downcast in wrappers, then TestScheduler::simulate_random_delay() - β
BackgroundExecutorβ UsesExecutorwrapper
Cloud Compatibility
- β
Session Coordination:
ExecutionContext.wait_until()viaForegroundExecutor - β Task Lifecycle: Uses wrapper's TestScheduler access for validation
- β Worker Management: Session context and cleanup validation
- β Background Tasks: Explicit permission system for long-running work
- β Deterministic Testing: Full TestScheduler integration with session tracking
- β
Platform Runtime:
PlatformRuntime.delay()via wrapper timer - β Session Validation: Dangling task detection with proper error reporting
- β Auto-Association: Tasks automatically linked to sessions during spawn
Unified Benefits
- β Clean Separation: GPUI gets deprioritization, Cloud gets session coordination
- β Unified Task Tracking: Both domains use TestScheduler via wrappers for validation
- β Composability: Session coordination built on unified scheduling primitives
- β Domain-Specific: Each domain handles its coordination concerns appropriately
- β Test Infrastructure: Shared deterministic testing capabilities
- β Production Ready: Both domains can use minimal platform schedulers
- β Extensible: New coordination patterns can be added without shared crate changes
- β Thread Safety: ForegroundExecutor enforces main-thread use across domains
Implementation Notes
Key Design Decisions
- GPUI: Uses
Executorfor background (Send),ForegroundExecutorfor main-thread (!Send) - Cloud: Uses
ForegroundExecutorfor single-threaded simplicity (no Send required on futures) - Shared: Core scheduling primitives + wrappers for delegation and safety
- Integration: Both domains use wrappers with consistent API
Migration Considerations
- Zero Breaking Changes: Existing APIs preserved via wrappers
- Gradual Migration: Can migrate GPUI and Cloud independently
- Test Preservation: All existing test functionality maintained
- Performance: Minimal overhead from trait objects in production
- Cloud Simplification: ForegroundExecutor allows non-Send futures in single-threaded context
This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.