Update scheduler architecture plans with Executor and ForegroundExecutor wrappers

Nathan Sobo created

Change summary

thoughts/2025-08-31_15-47-17_test-scheduler-design.md          | 139 +
thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md | 409 +--
2 files changed, 247 insertions(+), 301 deletions(-)

Detailed changes

thoughts/2025-08-31_15-47-17_test-scheduler-design.md πŸ”—

@@ -1,24 +1,16 @@
-<file_path>
-zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md
-</file_path>
-
-<edit_description>
-Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety
-</edit_description>
-
 # TestScheduler Design Details
 
-This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios.
+This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc<dyn Scheduler>, with ForegroundExecutor using PhantomData<Rc<()>> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.**
 
 ## Overview
 
 The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling:
 
 - **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan).
-- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan).
+- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.**
 - **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management.
 
-There is **no separate TestScheduler trait**β€”all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context.
+There is **no separate TestScheduler trait**β€”all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc<dyn Scheduler>, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.**
 
 ## Design Principles
 
@@ -26,7 +18,8 @@ There is **no separate TestScheduler trait**β€”all test-specific methods are dir
 - **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine.
 - **Determinism**: Always seeded with configurable randomization.
 - **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests.
-- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor).
+- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor).
+- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation.
 
 ## Complete TestScheduler Implementation
 
@@ -38,6 +31,7 @@ use std::pin::Pin;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::sync::{Arc, Mutex};
 use std::task::{Context, Poll, Waker};
+use std::thread;
 use std::time::{Duration, Instant};
 
 use anyhow::{anyhow, Result};
@@ -179,13 +173,13 @@ struct SchedulerState {
     parking_allowed: AtomicBool,
     execution_history: Vec<String>,
     fuzz_inputs: Option<FuzzedSchedulerInputs>,
+    creation_thread_id: thread::ThreadId,  // Added for thread safety checks
 }
 
 // Concrete implementation
 pub struct TestScheduler {
     state: Arc<Mutex<SchedulerState>>,
 }
-```
 
 impl TestScheduler {
     /// Primary constructor: Create a scheduler from full configuration.
@@ -215,10 +209,19 @@ impl TestScheduler {
             parking_allowed: AtomicBool::new(false),
             execution_history: Vec::new(),
             fuzz_inputs,
+            creation_thread_id: thread::current().id(),  // Capture creation thread
         };
         Arc::new(Self { state: Arc::new(Mutex::new(state)) })
     }
 
+    // Added for ForegroundExecutor thread checks
+    pub fn assert_main_thread(&self) {
+        let state = self.state.lock().unwrap();
+        if thread::current().id() != state.creation_thread_id {
+            panic!("ForegroundExecutor method called from wrong thread");
+        }
+    }
+
     /// Convenience helper: Create a seeded scheduler (randomization enabled by default).
     pub fn from_seed(seed: u64) -> Arc<Self> {
         Self::new(SchedulerConfig::seeded(seed))
@@ -378,24 +381,41 @@ impl TestScheduler {
 
 ## GPUI Usage Example
 
-GPUI wraps the TestScheduler to maintain its existing API:
+GPUI wraps the TestScheduler using Executor and ForegroundExecutor:
 
 ```rust
-pub struct BackgroundExecutor {
-    scheduler: Arc<dyn Scheduler>,  // TestScheduler implements Scheduler
+use std::marker::PhantomData;
+use std::rc::Rc;
+
+// Generic Executor for background tasks (Send futures)
+pub struct Executor {
+    scheduler: Arc<dyn Scheduler>,
 }
 
-impl BackgroundExecutor {
+impl Executor {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
+        Self { scheduler }
+    }
+
+    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
+    where R: Send + 'static {
+        // Delegate to scheduler via downcast for test methods if applicable
+        if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            // Use test_sched methods
+        }
+        self.scheduler.spawn(future)
+    }
+
     pub fn spawn_labeled<R>(
-        &self, 
+        &self,
         label: TaskLabel,
         future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R> {
-        // Direct downcast for GPUI test features
+    ) -> Task<R>
+    where R: Send + 'static {
         if let Some(test_sched) = self.scheduler.as_any().downcast_ref::<TestScheduler>() {
-            test_sched.deprioritize(label);  // Use test method directly
+            test_sched.deprioritize(label);
         }
-        self.scheduler.spawn(future)
+        self.scheduler.spawn_labeled(label, future)
     }
 
     pub fn deprioritize(&self, label: TaskLabel) {
@@ -403,23 +423,81 @@ impl BackgroundExecutor {
             test_sched.deprioritize(label);
         }
     }
+
+    pub fn timer(&self, duration: Duration) -> Task<()> {
+        self.scheduler.timer(duration)
+    }
+
+    pub fn tick(&self) -> Option<bool> {
+        self.scheduler.as_any().downcast_ref::<TestScheduler>().map(|ts| ts.tick(false))
+    }
+}
+
+// ForegroundExecutor for main-thread tasks (!Send futures, thread checks)
+pub struct ForegroundExecutor {
+    executor: Executor,
+    _phantom: PhantomData<Rc<()>>,  // Enforces !Send
+}
+
+impl !Send for ForegroundExecutor {}  // Explicitly !Send
+
+impl ForegroundExecutor {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
+        let executor = Executor::new(scheduler);
+        // Check thread immediately via scheduler
+        if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            test_sched.assert_main_thread();
+        } else {
+            // Production: assume created on main thread
+        }
+        Ok(Self { executor, _phantom: PhantomData })
+    }
+
+    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
+    where R: 'static {
+        // Assert thread before delegating
+        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            test_sched.assert_main_thread();
+        }
+        self.executor.scheduler.spawn_foreground(future)
+    }
+
+    pub fn timer(&self, duration: Duration) -> Task<()> {
+        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            test_sched.assert_main_thread();
+        }
+        self.executor.scheduler.timer(duration)
+    }
+
+    // Other methods mirror Executor but with thread checks
 }
 ```
 
 ## Cloud Usage Example
 
-Cloud wraps for session coordination:
+Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required):
 
 ```rust
 impl SimulatedExecutionContext {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
+        let fg_executor = ForegroundExecutor::new(scheduler)?;  // Use ForegroundExecutor for thread safety and simplicity
+        Self {
+            executor: fg_executor,
+            session_counter: AtomicUsize::new(0),
+            sessions: Mutex::new(HashMap::new()),
+            current_session: Mutex::new(None),
+        }
+    }
+
     pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
-        let task = self.scheduler.spawn(async move { future.await })?;
+        let task = self.executor.spawn(async move { future.await })?;
         
-        // Direct use of TestScheduler methods
-        let scheduler = self.scheduler.as_any().downcast_ref::<TestScheduler>().unwrap();
-        if let Some(session_id) = scheduler.get_current_session() {
-            scheduler.track_task_for_session(task.id(), session_id);
-            scheduler.add_wait_until_task(session_id, task.id());
+        // Direct use of TestScheduler methods via downcast from executor
+        if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            if let Some(session_id) = test_sched.get_current_session() {
+                test_sched.track_task_for_session(task.id(), session_id);
+                test_sched.add_wait_until_task(session_id, task.id());
+            }
         }
         
         Ok(())
@@ -435,7 +513,8 @@ impl SimulatedExecutionContext {
 - **futures**: For channels.
 - **chrono**: For time ranges (optional).
 - **anyhow**: For errors.
+- **std::thread**: For thread ID comparison.
 
 The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting.
 
-This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct.
+This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety.

thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md πŸ”—

@@ -1,46 +1,27 @@
-# Unified Scheduler Architecture - Layered Design
-
-## Overview
-
-A clean layered architecture where:
-- **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`)
-- **GPUI**: Uses trait objects for production safety, test features via `TestScheduler`
-- **Cloud**: Session coordination integrated directly in `SimulatedExecutionContext` using unified scheduler primitives
-
-Key design principles:
-- Main `Scheduler` trait has only essential methods (no test pollution)
-- Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific
-- Production schedulers implement minimal interface
-- Cloud handles session coordination at ExecutionContext layer using unified primitives
-
-## Core Architecture
-
-```
 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 β”‚            Shared Crate                 β”‚
 β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
 β”‚ Scheduler trait:                        β”‚
 β”‚ - Core object-safe interface             β”‚
-β”‚ - Platform integration (park, now)      β”‚
 β”‚                                         β”‚
 β”‚ TestScheduler:                          β”‚
 β”‚ - Should implement Scheduler + test features  β”‚
 β”‚ - deprioritize() - test-only method     β”‚
 β”‚ - spawn_labeled() - labels for testing  β”‚
 β”‚ - Task lifecycle tracking               β”‚
+β”‚ - creation_thread_id for Foreground checksβ”‚
 β”‚                                         β”‚
-β”‚ Generic spawn helpers:                  β”‚
-β”‚ - spawn() / spawn_foreground()          β”‚
-β”‚ - timer(), block(), block_with_timeout()β”‚
-β”‚ - Future-based API for trait objects    β”‚
+β”‚ Executor wrappers:                      β”‚
+β”‚ - Executor: Wraps Arc<dyn Scheduler>, Send futuresβ”‚
+β”‚ - ForegroundExecutor: Wraps Arc<dyn Scheduler>, !Send, thread checksβ”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β–²
           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
           β”‚         β”‚         β”‚
 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”  β”Œβ”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
 β”‚   GPUI       β”‚  β”‚     Cloud       β”‚
-β”‚ Uses trait   β”‚  β”‚ Session coord. β”‚
-β”‚ objects      β”‚  β”‚ in ExecContext β”‚
+β”‚ Uses Executorβ”‚  β”‚ ForegroundExec β”‚
+β”‚ + Foreground β”‚  β”‚ for single-thrdβ”‚
 β”‚ + TestSchedulerβ”‚  β”‚ + TestSchedulerβ”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 ```
@@ -60,28 +41,27 @@ Platform-specific scheduler implementations **should remain in GPUI's platform m
 - GPUI's event loop integration (main thread messaging)
 - Platform-specific performance optimizations
 
-The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI.
+The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.**
 
 ### BackgroundExecutor Integration
 
-GPUI's executors will use trait objects for scheduling:
+GPUI's executors now use wrappers:
 
 ```rust
 // crates/gpui/src/executor.rs
 pub struct BackgroundExecutor {
-    scheduler: Arc<dyn Scheduler>,  // Any Scheduler implementation via trait objects
+    executor: Executor,  // Generic wrapper for background tasks (Send futures)
 }
 
 impl BackgroundExecutor {
     pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
-        Self { scheduler }
+        Self { executor: Executor::new(scheduler) }
     }
 
-    // Core spawning methods via generic helpers
+    // Core spawning methods via wrapper delegation
     pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
     where R: Send + 'static {
-        // Generic spawn helper implemented on dyn Scheduler - full Future support
-        self.scheduler.spawn(future)
+        self.executor.spawn(future)
     }
 
     pub fn spawn_labeled<R>(
@@ -90,50 +70,63 @@ impl BackgroundExecutor {
         future: impl Future<Output = R> + Send + 'static
     ) -> Task<R>
     where R: Send + 'static {
-        // Generic spawn_labeled helper implemented on dyn Scheduler
-        self.scheduler.spawn_labeled(label, future)
+        self.executor.spawn_labeled(label, future)
     }
 
-    // Timer functionality via generic helper using schedule_after
+    // Timer functionality via wrapper
     pub fn timer(&self, duration: Duration) -> Task<()> {
-        self.scheduler.timer(duration)
+        self.executor.timer(duration)
     }
 
-    // Test-specific methods via downcast to TestScheduler
+    // Test-specific methods via downcast in wrapper
     pub fn deprioritize(&self, label: TaskLabel) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.deprioritize(label);
-        } else {
-            // Production: ignore silently
-        }
+        self.executor.deprioritize(label);
     }
 
     pub fn tick(&self) -> Option<bool> {
-        self.scheduler.downcast_ref::<TestScheduler>()
-            .map(|ts| ts.tick())
+        self.executor.tick()
     }
 }
 ```
 
 ### ForegroundExecutor Integration
 
+GPUI's foreground executor enforces main-thread usage:
+
 ```rust
 // crates/gpui/src/executor.rs
 pub struct ForegroundExecutor {
-    scheduler: Rc<dyn Scheduler>,  // Rc for single-threaded use
+    executor: Executor,  // Underlying executor for delegation
+    _phantom: PhantomData<Rc<()>>,  // Enforces !Send
+    creation_thread_id: ThreadId,  // Stored for checks
 }
 
+impl !Send for ForegroundExecutor {}  // Explicitly !Send
+
 impl ForegroundExecutor {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
+        let creation_thread_id = thread::current().id();
+        // Delegate creation to underlying scheduler
+        let _ = Executor::new(scheduler.clone());
+        Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id })
+    }
+
     // Core spawning for main thread (non-Send futures)
     pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
     where R: 'static {
-        // Generic spawn_foreground helper implemented on dyn Scheduler
-        self.scheduler.spawn_foreground(future)
+        if thread::current().id() != self.creation_thread_id {
+            panic!("ForegroundExecutor called off main thread");
+        }
+        // Delegate to scheduler.spawn_foreground via wrapper
+        self.executor.scheduler.spawn_foreground(future)
     }
 
-    // Timer and test methods same as BackgroundExecutor
+    // Timer and test methods same as BackgroundExecutor but with thread checks
     pub fn timer(&self, duration: Duration) -> Task<()> {
-        self.scheduler.timer(duration)
+        if thread::current().id() != self.creation_thread_id {
+            panic!("ForegroundExecutor called off main thread");
+        }
+        self.executor.timer(duration)
     }
 }
 ```
@@ -142,12 +135,12 @@ impl ForegroundExecutor {
 
 ### Session Coordination in SimulatedExecutionContext
 
-Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers:
+Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.**
 
 ```rust
 // crates/platform_simulator/src/platform.rs
 pub struct SimulatedExecutionContext {
-    scheduler: Arc<dyn Scheduler>,  // Unified scheduler via composition
+    fg_executor: ForegroundExecutor,  // Single-threaded wrapper for simplicity
     session_counter: AtomicUsize,
     sessions: Mutex<HashMap<SessionId, WorkerSession>>,
     current_session: Mutex<Option<SessionId>>,
@@ -156,32 +149,28 @@ pub struct SimulatedExecutionContext {
 #[async_trait(?Send)]
 impl PlatformRuntime for SimulatedExecutionContext {
     async fn delay(&self, duration: Duration) {
-        // Use unified scheduler's delay mechanism through timer
-        self.scheduler.timer(duration).await;
+        // Use wrapper's timer for delay
+        self.fg_executor.timer(duration).await;
     }
 }
 ```
 
 ### Wait Until Implementation
 
-Session coordination integrated directly with unified task scheduling:
+Session coordination integrated directly with wrapper's task scheduling:
 
 ```rust
 #[async_trait(?Send)]
 impl ExecutionContext for SimulatedExecutionContext {
     fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
-        // 1. Spawn using unified scheduler
-        let task_id = self.scheduler.spawn(async move {
-            // Add delay via scheduler timer for deterministic simulation
-            self.scheduler.timer(Duration::from_millis(10)).await;
-            let _ = future.await;
-        })?;
-
-        // 2. Register with session coordination (direct access)
-        if let Some(session_id) = *self.current_session.lock() {
-            if let Some(session) = self.sessions.lock().get_mut(&session_id) {
-                session.wait_until_task_ids.insert(task_id);
-                self.link_task_to_session(task_id, session_id);
+        // 1. Spawn using wrapper (no Send required)
+        let task = self.fg_executor.spawn(async move { future.await })?;
+        
+        // 2. Register with session coordination via downcast
+        if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+            if let Some(session_id) = test_sched.get_current_session() {
+                test_sched.track_task_for_session(task.id(), session_id);
+                test_sched.add_wait_until_task(session_id, task.id());
             }
         }
 
@@ -189,8 +178,8 @@ impl ExecutionContext for SimulatedExecutionContext {
     }
 
     async fn pass_through(&self) -> Result<()> {
-        // Use unified scheduler's timer for delay
-        self.scheduler.timer(Duration::from_millis(10)).await;
+        // Use wrapper's timer for delay
+        self.fg_executor.timer(Duration::from_millis(10)).await;
         Ok(())
     }
 }
@@ -202,13 +191,14 @@ Core session operations handled within SimulatedExecutionContext:
 
 ```rust
 impl SimulatedExecutionContext {
-    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
-        Self {
-            scheduler,
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Result<Self> {
+        let fg_executor = ForegroundExecutor::new(scheduler)?;
+        Ok(Self {
+            fg_executor,
             session_counter: AtomicUsize::new(0),
             sessions: Mutex::new(HashMap::new()),
             current_session: Mutex::new(None),
-        }
+        })
     }
 
     pub fn create_session(&self) -> SessionId {
@@ -245,60 +235,32 @@ impl SimulatedExecutionContext {
     pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
         let sessions = self.sessions.lock();
         if let Some(session) = sessions.get(&session_id) {
-            // Check running tasks using unified scheduler's task tracking
-            let dangling_tasks: Vec<TaskId> = session
-                .spawned_tasks
-                .iter()
-                .filter(|&&task_id| self.scheduler.is_task_running(task_id))
-                .copied()
-                .collect();
-
-            // Cloud-specific permission check
-            let unauthorized: Vec<_> = dangling_tasks
-                .into_iter()
-                .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
-                .collect();
-
-            if !unauthorized.is_empty() {
-                return Err(platform_api::WorkerError::Other(anyhow!(
-                    "Session cleanup failed: {} unauthorized tasks still running",
-                    unauthorized.len()
-                )));
+            // Check running tasks using wrapper's TestScheduler access
+            if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::<TestScheduler>() {
+                let dangling_tasks: Vec<TaskId> = session
+                    .spawned_tasks
+                    .iter()
+                    .filter(|&&task_id| test_sched.is_task_running(task_id))
+                    .copied()
+                    .collect();
+
+                // Cloud-specific permission check
+                let unauthorized: Vec<_> = dangling_tasks
+                    .into_iter()
+                    .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
+                    .collect();
+
+                if !unauthorized.is_empty() {
+                    return Err(platform_api::WorkerError::Other(anyhow!(
+                        "Session cleanup failed: {} unauthorized tasks still running",
+                        unauthorized.len()
+                    )));
+                }
             }
         }
         Ok(())
     }
 
-    // Link tasks to sessions during spawning
-    fn link_task_to_session(&self, task_id: TaskId, session_id: SessionId) {
-        if let Some(session) = self.sessions.lock().get_mut(&session_id) {
-            session.spawned_tasks.insert(task_id);
-        }
-    }
-
-    fn spawn_with_session(&self, future: Pin<Box<dyn Future<Output = ()>>>) -> TaskId {
-        let task_id = self.scheduler.spawn(future)?;
-
-        // Auto-associate with current session
-        if let Some(session_id) = *self.current_session.lock() {
-            self.link_task_to_session(task_id, session_id);
-        }
-
-        Ok(task_id)
-    }
-}
-```
-
-### Cloud-Specific Data Structures
-
-```rust
-// Session coordination is Cloud-specific but built on unified scheduler
-pub struct WorkerSession {
-    spawned_tasks: HashSet<TaskId>,        // Tracks tasks in session
-    wait_until_task_ids: HashSet<TaskId>,  // Explicitly allowed background tasks
-}
-
-impl SimulatedExecutionContext {
     pub fn set_current_session(&self, session_id: SessionId) {
         *self.current_session.lock() = Some(session_id);
     }
@@ -309,15 +271,9 @@ impl SimulatedExecutionContext {
 }
 ```
 
-### Architecture Benefits
+### Cloud-Specific Data Structures
 
-βœ… **Clean Composition**: Unified scheduling primitives + Cloud-specific session coordination
-βœ… **Unified Task Tracking**: Uses TestScheduler's `is_task_running()` for session validation
-βœ… **Natural Coupling**: Session coordination lives where ExecutionContext operates
-βœ… **Minimal Abstraction**: No additional coordinator layer needed
-βœ… **Cloud-Specific Concerns**: Session logic remains in Cloud repo
-βœ… **Test Integration**: Full TestScheduler features available for Cloud testing
-βœ… **Deterministic Simulation**: Session-aware timing and task ordering
+Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers.
 
 ## Scheduler Trait Definition
 
@@ -343,7 +299,8 @@ pub trait Scheduler: Send + Sync {
     fn is_main_thread(&self) -> bool;
     fn now(&self) -> Instant;
 }
-```
+
+
 
 ## TestScheduler (Concrete Implementation)
 
@@ -368,226 +325,136 @@ struct TestSchedulerInner {
     parking_allowed: bool,
     waiting_hint: Option<String>,
     block_tick_range: std::ops::RangeInclusive<usize>,
+    creation_thread_id: ThreadId,  // Added for wrapper checks
 }
 
 impl Scheduler for TestScheduler {
     fn schedule(&self, runnable: Runnable) {
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-
-        // Schedule completion callback
-        let scheduler = self.clone();
-        let completion_runnable = async_task::spawn(async move {
-            runnable.run();
-            scheduler.mark_task_completed(task_id);
-        }, |_| {}).0;
-
-        completion_runnable.schedule();
+        // Implementation as before
     }
 
     fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-
-        if self.inner.borrow().deprioritized_labels.contains(&label) {
-            self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id));
-            self.inner.borrow_mut().task_labels.insert(task_id, label);
-        } else {
-            self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-            let completion_runnable = async_task::spawn(async move {
-                runnable.run();
-                // Mark as completed when done
-            }, |_| {}).0;
-            completion_runnable.schedule();
-        }
+        // Implementation as before, with deprioritization
     }
 
     fn schedule_foreground(&self, runnable: Runnable) {
-        assert!(self.is_main_thread(), "schedule_foreground called off main thread");
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-
-        let completion_runnable = async_task::spawn(async move {
-            runnable.run();
-            // Mark as completed
-        }, |_| {}).0;
-
-        self.inner.borrow_mut().main_thread_queue.push(completion_runnable);
+        assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread");
+        // Implementation as before
     }
 
-    fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
-    fn now(&self) -> Instant { self.inner.borrow().now }
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
-    }
-    fn unparker(&self) -> Unparker {
-        self.inner.borrow().parker.unparker()
-    }
+    // Other trait methods unchanged
 }
 
 impl TestScheduler {
     // Test-specific methods (NOT on main trait)
-    pub fn deprioritize(&self, label: TaskLabel) {
-        self.inner.borrow_mut().deprioritized_labels.insert(label);
-    }
-
-    pub fn is_task_running(&self, task_id: TaskId) -> bool {
-        self.inner.borrow().tasks.contains_key(&task_id)
-    }
-
+    pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ }
+    pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ }
     pub fn tick(&self) -> bool { /* implementation */ }
     pub fn run_until_parked(&self) { /* implementation */ }
     pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
+    pub fn assert_main_thread(&self) { /* implementation */ }
 }
 ```
 
 ## Generic Spawn Helpers
 
-Generic spawn methods implemented for `dyn Scheduler`:
-
-```rust
-impl dyn Scheduler {
-    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
-    where R: Send + 'static {
-        let task_id = self.assign_task_id();
-        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
-            self.mark_task_started(task_id);
-            self.schedule_completion_callback(runnable, task_id);
-        });
-
-        self.schedule(runnable);
-        Task { /* ... */ }
-    }
-
-    pub fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
-    where R: 'static {
-        let task_id = self.assign_task_id();
-        let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| {
-            self.mark_task_started(task_id);
-            self.schedule_completion_callback(runnable, task_id);
-        });
-
-        self.schedule_foreground(runnable);
-        Task { /* ... */ }
-    }
-
-    pub fn timer(&self, duration: Duration) -> Task<()> {
-        if duration.is_zero() {
-            return Task::ready(());
-        }
-
-        let (runnable, inner_task) = async_task::spawn(async move {}, {
-            let scheduler = &*self;
-            move |runnable| {
-                scheduler.schedule_after(duration, runnable);
-            }
-        });
-
-        runnable.schedule();
-        Task { /* ... */ }
-    }
-
-    pub fn is_task_running(&self, task_id: TaskId) -> bool {
-        // Requires downcast to TestScheduler
-        None // Default implementation
-    }
-}
-```
+Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers.
 
 ## Migration Strategy
 
 ### Phase 1: Core Infrastructure
 1. Define `Scheduler` trait (core methods only)
-2. Implement `TestScheduler` (with test features like `deprioritize()`)
-3. Make existing GPUI platform dispatchers implement `Scheduler` trait
-   - MacDispatcher implements `Scheduler` for GCD integration
-   - LinuxDispatcher implements `Scheduler` for thread pools
-   - WindowsDispatcher implements `Scheduler` for Windows ThreadPool
-4. Define `Task<T>` with mandatory TaskId
+2. Implement `TestScheduler` with thread ID tracking
+3. Add wrapper structs `Executor` and `ForegroundExecutor`
+4. Make existing GPUI platform dispatchers implement `Scheduler` trait
+5. Add `as_any()` to `Scheduler` for downcasting
 
 ### Phase 2: GPUI Migration
-1. Update GPUI executors to use trait objects
-2. Add downcasting for test features
+1. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers
+2. Handle downcasting in wrappers for test features
 3. Preserve all existing GPUI functionality
 4. Test deployments use TestScheduler, production uses minimal schedulers
 
 ### Phase 3: Cloud Integration
-1. Update `SimulatedExecutionContext` to use `Arc<dyn Scheduler>`
+1. Update `SimulatedExecutionContext` to use `ForegroundExecutor`
 2. Move session coordination logic into `SimulatedExecutionContext`
-3. Integrate `wait_until()` with unified task scheduling
-4. Use TestScheduler features for session validation
+3. Integrate `wait_until()` with wrapper scheduling
+4. Use TestScheduler features for session validation via downcast
 5. Preserve all existing Cloud platform APIs
 
 ### Phase 4: Testing & Validation
 1. GPUI tests work with new architecture
-2. Cloud session behavior preserved
+2. Cloud session behavior preserved (single-threaded)
 3. Production efficiency maintained
 4. Both domains benefit from unified test infrastructure
 
 ## Platform Backend Files
 
 ### GPUI Backends
-- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` should implement `Scheduler` trait
-- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` should implement `Scheduler` trait
-- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` should implement `Scheduler` trait
+- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler`
+- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler`
+- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler`
 - `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` β†’ `TestScheduler` (moved to shared crate)
 
 ### Cloud Backends
-- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` should contain `Scheduler` + session coordination
-- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using Scheduler
+- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor`
+- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor`
 
 ## Compatibility Checklist
 
 ## Complete GPUI + Cloud Feature Coverage βœ…
 
 ### GPUI Compatibility
-- βœ… `spawn()` β†’ `dyn Scheduler::spawn()` (generic helper on trait object)
-- βœ… `spawn_labeled(label)` β†’ `dyn Scheduler::spawn_labeled()` (generic helper on trait object)
-- βœ… `timer(duration)` β†’ `dyn Scheduler::timer()` (generic helper using schedule_after)
-- βœ… `block(future)` β†’ `dyn Scheduler::block()` (generic helper with parking)
-- βœ… `block_with_timeout(future, timeout)` β†’ `dyn Scheduler::block_with_timeout()` (generic helper)
-- βœ… `now()` β†’ `scheduler.now()` (direct trait object method)
-- βœ… `is_main_thread()` β†’ `scheduler.is_main_thread()` (direct trait object method)
-- βœ… `num_cpus()` β†’ `dyn Scheduler::num_cpus()` (generic helper)
-- βœ… `deprioritize(label)` β†’ Downcast to TestScheduler, then TestScheduler::deprioritize()
-- βœ… `tick()` β†’ Downcast to TestScheduler, then TestScheduler::tick()
-- βœ… `run_until_parked()` β†’ Downcast to TestScheduler, then TestScheduler::run_until_parked()
-- βœ… `advance_clock(duration)` β†’ Downcast to TestScheduler, then TestScheduler::advance_clock()
-- βœ… `simulate_random_delay()` β†’ Downcast to TestScheduler, then TestScheduler::simulate_random_delay()
-- βœ… `BackgroundExecutor` β†’ Trait object wrapper using `dyn Scheduler`
+- βœ… `spawn()` β†’ `Executor::spawn()` or `ForegroundExecutor::spawn()`
+- βœ… `spawn_labeled(label)` β†’ Wrappers delegate to `dyn Scheduler::spawn_labeled()`
+- βœ… `timer(duration)` β†’ Wrappers delegate to `dyn Scheduler::timer()`
+- βœ… `block(future)` β†’ Wrappers handle with parking
+- βœ… `block_with_timeout(future, timeout)` β†’ Wrappers handle
+- βœ… `now()` β†’ `scheduler.now()` (direct trait method)
+- βœ… `is_main_thread()` β†’ `scheduler.is_main_thread()` (direct trait method)
+- βœ… `num_cpus()` β†’ Generic helper on wrappers
+- βœ… `deprioritize(label)` β†’ Downcast in wrappers, then TestScheduler::deprioritize()
+- βœ… `tick()` β†’ Downcast in wrappers, then TestScheduler::tick()
+- βœ… `run_until_parked()` β†’ Downcast in wrappers, then TestScheduler::run_until_parked()
+- βœ… `advance_clock(duration)` β†’ Downcast in wrappers, then TestScheduler::advance_clock()
+- βœ… `simulate_random_delay()` β†’ Downcast in wrappers, then TestScheduler::simulate_random_delay()
+- βœ… `BackgroundExecutor` β†’ Uses `Executor` wrapper
 
 ### Cloud Compatibility
-- βœ… **Session Coordination**: `ExecutionContext.wait_until()` with direct session integration
-- βœ… **Task Lifecycle**: Uses unified scheduler's `is_task_running()` for validation
+- βœ… **Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor`
+- βœ… **Task Lifecycle**: Uses wrapper's TestScheduler access for validation
 - βœ… **Worker Management**: Session context and cleanup validation
 - βœ… **Background Tasks**: Explicit permission system for long-running work
 - βœ… **Deterministic Testing**: Full TestScheduler integration with session tracking
-- βœ… **Platform Runtime**: `PlatformRuntime.delay()` via unified scheduler timer
+- βœ… **Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer
 - βœ… **Session Validation**: Dangling task detection with proper error reporting
 - βœ… **Auto-Association**: Tasks automatically linked to sessions during spawn
 
 ### Unified Benefits
 - βœ… **Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination
-- βœ… **Unified Task Tracking**: Both domains use `TestScheduler.is_task_running()` for validation
+- βœ… **Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation
 - βœ… **Composability**: Session coordination built on unified scheduling primitives
 - βœ… **Domain-Specific**: Each domain handles its coordination concerns appropriately
 - βœ… **Test Infrastructure**: Shared deterministic testing capabilities
 - βœ… **Production Ready**: Both domains can use minimal platform schedulers
 - βœ… **Extensible**: New coordination patterns can be added without shared crate changes
+- βœ… **Thread Safety**: ForegroundExecutor enforces main-thread use across domains
 
 ## Implementation Notes
 
 ### Key Design Decisions
 
-1. **GPUI**: Uses task labels for deterministic UI testing
-2. **Cloud**: Uses session coordination for worker lifecycle management
-3. **Shared**: Core scheduling primitives + TestScheduler for task tracking
-4. **Integration**: Both domains use composition with unified scheduler
+1. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send)
+2. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures)
+3. **Shared**: Core scheduling primitives + wrappers for delegation and safety
+4. **Integration**: Both domains use wrappers with consistent API
 
 ### Migration Considerations
 
-- **Zero Breaking Changes**: Existing APIs preserved via generic helpers
+- **Zero Breaking Changes**: Existing APIs preserved via wrappers
 - **Gradual Migration**: Can migrate GPUI and Cloud independently
 - **Test Preservation**: All existing test functionality maintained
 - **Performance**: Minimal overhead from trait objects in production
+- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context
 
-This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure.
+This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers.