Restructure unified scheduler architecture: separate GPUI/Cloud sections with comprehensive integration coverage

Nathan Sobo created

- Add dedicated GPUI Integration section with BackgroundExecutor/ForegroundExecutor
- Add dedicated Cloud Integration section with SimulatedExecutionContext session coordination
- Clarify session coordination approach: handled directly in Cloud's SimulatedExecutionContext
- Update migration strategies for both domains
- Balance coverage depth between GPUI and Cloud
- Change present-tense to future-tense planning language
- Reorganize platform backend implementations clearly
- Update architecture diagram to show domain-specific integration

Change summary

thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md | 1005 +--
1 file changed, 292 insertions(+), 713 deletions(-)

Detailed changes

thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md πŸ”—

@@ -5,13 +5,13 @@
 A clean layered architecture where:
 - **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`)
 - **GPUI**: Uses trait objects for production safety, test features via `TestScheduler`
-- **Cloud**: Session wrapper uses `TestScheduler` for session coordination
+- **Cloud**: Session coordination integrated directly in `SimulatedExecutionContext` using unified scheduler primitives
 
 Key design principles:
 - Main `Scheduler` trait has only essential methods (no test pollution)
 - Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific
 - Production schedulers implement minimal interface
-- Cloud requires `TestScheduler` for session features
+- Cloud handles session coordination at ExecutionContext layer using unified primitives
 
 ## Core Architecture
 
@@ -39,19 +39,21 @@ Key design principles:
           β”‚         β”‚         β”‚
 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”  β”Œβ”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
 β”‚   GPUI       β”‚  β”‚     Cloud       β”‚
-β”‚ Uses trait   β”‚  β”‚ CloudSimulated  β”‚
-β”‚ objects      β”‚  β”‚ uses Test-     β”‚
-β”‚ + TestSchedulerβ”‚  β”‚ Scheduler     β”‚
+β”‚ Uses trait   β”‚  β”‚ Session coord. β”‚
+β”‚ objects      β”‚  β”‚ in ExecContext β”‚
+β”‚ + TestSchedulerβ”‚  β”‚ + TestSchedulerβ”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
 ```
 
-## Platform Scheduler Implementations
+## GPUI Integration
+
+### Platform Scheduler Implementations in GPUI
 
 Platform-specific scheduler implementations **should remain in GPUI's platform modules**:
 
-- **MacScheduler**: Should implement `Scheduler` trait, uses GCD APIs
-- **LinuxScheduler**: Should implement `Scheduler` trait, uses thread pools + calloop
-- **WindowsScheduler**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
+- **MacDispatcher**: Should implement `Scheduler` trait, uses GCD APIs
+- **LinuxDispatcher**: Should implement `Scheduler` trait, uses thread pools + calloop
+- **WindowsDispatcher**: Should implement `Scheduler` trait, uses Windows ThreadPool APIs
 
 **Rationale**: Platform implementations are substantial and deeply integrated with:
 - Platform-specific threading APIs (GCD, Windows ThreadPool, etc.)
@@ -60,265 +62,288 @@ Platform-specific scheduler implementations **should remain in GPUI's platform m
 
 The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI.
 
-## Scheduler Trait Definition
-
-```rust
-pub trait Scheduler: Send + Sync {
-    /// Schedule a runnable to be executed (object-safe core functionality)
-    fn schedule(&self, runnable: Runnable);
-
-    /// Schedule a runnable with label for test tracking
-    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
-
-    /// Schedule a runnable on the main thread (optional, defaults to panic)
-    fn schedule_foreground(&self, runnable: Runnable) {
-        panic!("schedule_foreground not supported by this scheduler");
-    }
+### BackgroundExecutor Integration
 
-    /// Schedule a runnable after a delay (object-safe for timers)
-    fn schedule_after(&self, duration: Duration, runnable: Runnable);
+GPUI's executors will use trait objects for scheduling:
 
-    /// Platform integration methods
-    fn park(&self, timeout: Option<Duration>) -> bool;
-    fn unparker(&self) -> Unparker;
-    fn is_main_thread(&self) -> bool;
-    fn now(&self) -> Instant;
+```rust
+// crates/gpui/src/executor.rs
+pub struct BackgroundExecutor {
+    scheduler: Arc<dyn Scheduler>,  // Any Scheduler implementation via trait objects
 }
-```
-
-**Explanation:**
-- Core trait methods are object-safe (no generic parameters)
-- `schedule` methods operate on `Runnable` for low-level execution control
-- Scheduler implementations manage task state internally when scheduling runnables
-- No task completion hooks needed on `Task` - scheduler tracks running tasks itself
-
-## Generic Spawn Helpers
 
-Generic spawn methods are implemented for `dyn Scheduler` to provide the high-level `Future` interface:
+impl BackgroundExecutor {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
+        Self { scheduler }
+    }
 
-```rust
-impl dyn Scheduler {
-    /// Spawn Send future (generic helper)
+    // Core spawning methods via generic helpers
     pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
     where R: Send + 'static {
-        let task_id = self.assign_task_id();
-        let task_metadata = TaskMetadata {
-            id: task_id,
-            label: None,
-            session: None,
-            spawn_location: std::panic::Location::caller(),
-        };
-
-        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
-            // Scheduler manages task lifecycle: mark as started when scheduled
-            self.mark_task_started(task_id);
-            // When runnable completes, scheduler marks as finished
-            self.schedule_completion_callback(runnable, task_id);
-        });
-
-        // Schedule the runnable (this adds to running tasks)
-        self.schedule(runnable);
-
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            metadata: task_metadata,
-        }
+        // Generic spawn helper implemented on dyn Scheduler - full Future support
+        self.scheduler.spawn(future)
     }
 
-    /// Spawn Send future with label (generic helper)
     pub fn spawn_labeled<R>(
         &self,
         label: TaskLabel,
         future: impl Future<Output = R> + Send + 'static
     ) -> Task<R>
     where R: Send + 'static {
-        let task_id = self.assign_task_id();
-        let task_metadata = TaskMetadata {
-            id: task_id,
-            label: Some(label),
-            session: None,
-            spawn_location: std::panic::Location::caller(),
-        };
-
-        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
-            self.mark_task_started(task_id);
-            self.schedule_completion_callback(runnable, task_id);
-        });
+        // Generic spawn_labeled helper implemented on dyn Scheduler
+        self.scheduler.spawn_labeled(label, future)
+    }
 
-        // Apply test-specific logic (e.g., deprioritization) in scheduler
-        self.schedule_labeled(runnable, label);
+    // Timer functionality via generic helper using schedule_after
+    pub fn timer(&self, duration: Duration) -> Task<()> {
+        self.scheduler.timer(duration)
+    }
 
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            metadata: task_metadata,
+    // Test-specific methods via downcast to TestScheduler
+    pub fn deprioritize(&self, label: TaskLabel) {
+        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
+            test_scheduler.deprioritize(label);
+        } else {
+            // Production: ignore silently
         }
     }
 
-    /// Spawn non-Send future on main thread (generic helper)
-    pub fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
-    where R: 'static {
-        let task_id = self.assign_task_id();
-        let task_metadata = TaskMetadata {
-            id: task_id,
-            label: None,
-            session: None,
-            spawn_location: std::panic::Location::caller(),
-        };
+    pub fn tick(&self) -> Option<bool> {
+        self.scheduler.downcast_ref::<TestScheduler>()
+            .map(|ts| ts.tick())
+    }
+}
+```
 
-        let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| {
-            self.mark_task_started(task_id);
-            self.schedule_completion_callback(runnable, task_id);
-        });
+### ForegroundExecutor Integration
 
-        self.schedule_foreground(runnable);
+```rust
+// crates/gpui/src/executor.rs
+pub struct ForegroundExecutor {
+    scheduler: Rc<dyn Scheduler>,  // Rc for single-threaded use
+}
 
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            metadata: task_metadata,
-        }
+impl ForegroundExecutor {
+    // Core spawning for main thread (non-Send futures)
+    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
+    where R: 'static {
+        // Generic spawn_foreground helper implemented on dyn Scheduler
+        self.scheduler.spawn_foreground(future)
     }
 
-    /// Create a timer task that completes after duration (generic helper)
+    // Timer and test methods same as BackgroundExecutor
     pub fn timer(&self, duration: Duration) -> Task<()> {
-        if duration.is_zero() {
-            return Task::ready(());
-        }
+        self.scheduler.timer(duration)
+    }
+}
+```
 
-        let (runnable, inner_task) = async_task::spawn(async move {}, {
-            let scheduler = &*self;
-            move |runnable| {
-                scheduler.schedule_after(duration, runnable);
-            }
-        });
+## Cloud Integration
 
-        runnable.schedule();
+### Session Coordination in SimulatedExecutionContext
 
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            metadata: TaskMetadata {
-                id: self.assign_task_id(),
-                label: None,
-                session: None,
-                spawn_location: std::panic::Location::caller(),
-            },
-        }
-    }
+Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers:
+
+```rust
+// crates/platform_simulator/src/platform.rs
+pub struct SimulatedExecutionContext {
+    scheduler: Arc<dyn Scheduler>,  // Unified scheduler via composition
+    session_counter: AtomicUsize,
+    sessions: Mutex<HashMap<SessionId, WorkerSession>>,
+    current_session: Mutex<Option<SessionId>>,
+}
 
-    /// Block current thread until future completes (generic helper)
-    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
-        let (tx, rx) = std::sync::mpsc::channel();
+#[async_trait(?Send)]
+impl PlatformRuntime for SimulatedExecutionContext {
+    async fn delay(&self, duration: Duration) {
+        // Use unified scheduler's delay mechanism through timer
+        self.scheduler.timer(duration).await;
+    }
+}
+```
 
-        let future = async move {
-            let result = future.await;
-            let _ = tx.send(result);
-        };
+### Wait Until Implementation
 
-        let task = self.spawn(future);
-        task.detach();
+Session coordination integrated directly with unified task scheduling:
 
-        match rx.recv() {
-            Ok(result) => result,
-            Err(_) => panic!("Block operation failed"),
+```rust
+#[async_trait(?Send)]
+impl ExecutionContext for SimulatedExecutionContext {
+    fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> {
+        // 1. Spawn using unified scheduler
+        let task_id = self.scheduler.spawn(async move {
+            // Add delay via scheduler timer for deterministic simulation
+            self.scheduler.timer(Duration::from_millis(10)).await;
+            let _ = future.await;
+        })?;
+
+        // 2. Register with session coordination (direct access)
+        if let Some(session_id) = *self.current_session.lock() {
+            if let Some(session) = self.sessions.lock().get_mut(&session_id) {
+                session.wait_until_task_ids.insert(task_id);
+                self.link_task_to_session(task_id, session_id);
+            }
         }
+
+        Ok(())
     }
 
-    /// Block current thread until future completes or timeout (generic helper)
-    pub fn block_with_timeout<R>(
-        &self,
-        future: impl Future<Output = R>,
-        timeout: Duration
-    ) -> Result<R, std::sync::mpsc::RecvTimeoutError> {
-        let (tx, rx) = std::sync::mpsc::channel();
+    async fn pass_through(&self) -> Result<()> {
+        // Use unified scheduler's timer for delay
+        self.scheduler.timer(Duration::from_millis(10)).await;
+        Ok(())
+    }
+}
+```
 
-        let future = async move {
-            let result = future.await;
-            let _ = tx.send(result);
-        };
+### Session Coordination Methods
 
-        let task = self.spawn(future);
-        task.detach();
+Core session operations handled within SimulatedExecutionContext:
 
-        rx.recv_timeout(timeout)
+```rust
+impl SimulatedExecutionContext {
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
+        Self {
+            scheduler,
+            session_counter: AtomicUsize::new(0),
+            sessions: Mutex::new(HashMap::new()),
+            current_session: Mutex::new(None),
+        }
     }
 
-    /// Get number of available CPUs (generic helper)
-    pub fn num_cpus(&self) -> usize {
-        std::thread::available_parallelism()
-            .map(|n| n.get())
-            .unwrap_or(1)
-    }
+    pub fn create_session(&self) -> SessionId {
+        let session_counter = self.session_counter.fetch_add(1, Ordering::SeqCst);
+        let session_id = SessionId(session_counter);
 
-    /// Run deterministic test tick (requires TestScheduler downcast)
-    pub fn tick(&self) -> Option<bool> {
-        // This requires downcasting to TestScheduler as it's test-specific
-        None // Return None if not TestScheduler
+        self.sessions.lock().insert(session_id, WorkerSession {
+            spawned_tasks: HashSet::new(),
+            wait_until_task_ids: HashSet::new(),
+        });
+
+        session_id
     }
 
-    /// Run all tasks until parked (requires TestScheduler downcast)
-    pub fn run_until_parked(&self) {
-        // This requires downcasting to TestScheduler as it's test-specific
+    pub fn with_session<F, R>(&self, session_id: SessionId, f: F) -> R
+    where
+        F: FnOnce() -> R,
+    {
+        {
+            let mut current = self.current_session.lock();
+            *current = Some(session_id);
+        }
+
+        let result = f();
+
+        {
+            let mut current = self.current_session.lock();
+            *current = None;
+        }
+
+        result
+    }
+
+    pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> {
+        let sessions = self.sessions.lock();
+        if let Some(session) = sessions.get(&session_id) {
+            // Check running tasks using unified scheduler's task tracking
+            let dangling_tasks: Vec<TaskId> = session
+                .spawned_tasks
+                .iter()
+                .filter(|&&task_id| self.scheduler.is_task_running(task_id))
+                .copied()
+                .collect();
+
+            // Cloud-specific permission check
+            let unauthorized: Vec<_> = dangling_tasks
+                .into_iter()
+                .filter(|task_id| !session.wait_until_task_ids.contains(task_id))
+                .collect();
+
+            if !unauthorized.is_empty() {
+                return Err(platform_api::WorkerError::Other(anyhow!(
+                    "Session cleanup failed: {} unauthorized tasks still running",
+                    unauthorized.len()
+                )));
+            }
+        }
+        Ok(())
     }
 
-    /// Advance fake clock time (requires TestScheduler downcast)
-    pub fn advance_clock(&self, duration: Duration) {
-        // This requires downcasting to TestScheduler as it's test-specific
+    // Link tasks to sessions during spawning
+    fn link_task_to_session(&self, task_id: TaskId, session_id: SessionId) {
+        if let Some(session) = self.sessions.lock().get_mut(&session_id) {
+            session.spawned_tasks.insert(task_id);
+        }
     }
 
-    /// Simulate random delay (requires TestScheduler downcast)
-    pub fn simulate_random_delay(&self) -> Option<impl Future<Output = ()>> {
-        // This requires downcasting to TestScheduler as it's test-specific
-        None
+    fn spawn_with_session(&self, future: Pin<Box<dyn Future<Output = ()>>>) -> TaskId {
+        let task_id = self.scheduler.spawn(future)?;
+
+        // Auto-associate with current session
+        if let Some(session_id) = *self.current_session.lock() {
+            self.link_task_to_session(task_id, session_id);
+        }
+
+        Ok(task_id)
     }
+}
+```
+
+### Cloud-Specific Data Structures
+
+```rust
+// Session coordination is Cloud-specific but built on unified scheduler
+pub struct WorkerSession {
+    spawned_tasks: HashSet<TaskId>,        // Tracks tasks in session
+    wait_until_task_ids: HashSet<TaskId>,  // Explicitly allowed background tasks
+}
 
-    /// Get seeded RNG (requires TestScheduler downcast)
-    pub fn rng(&self) -> Option<StdRng> {
-        // This requires downcasting to TestScheduler as it's test-specific
-        None
+impl SimulatedExecutionContext {
+    pub fn set_current_session(&self, session_id: SessionId) {
+        *self.current_session.lock() = Some(session_id);
     }
 
-    /// Deprioritize labeled tasks (requires TestScheduler downcast)
-    pub fn deprioritize(&self, label: TaskLabel) {
-        // This requires downcasting to TestScheduler as it's test-specific
+    pub fn get_current_session(&self) -> Option<SessionId> {
+        *self.current_session.lock()
     }
 }
 ```
 
-**Explanation:**
-- Core trait has only essential methods for object safety
-- Comprehensive generic helpers provide all GPUI executor APIs
-- Test-specific methods delegate to TestScheduler via downcasting when available
-- Production schedulers can ignore test methods (return None/default behavior)
-- Timer uses schedule_after for delayed execution
-- Blocking operations implemented using channels and task detachment
+### Architecture Benefits
+
+βœ… **Clean Composition**: Unified scheduling primitives + Cloud-specific session coordination
+βœ… **Unified Task Tracking**: Uses TestScheduler's `is_task_running()` for session validation
+βœ… **Natural Coupling**: Session coordination lives where ExecutionContext operates
+βœ… **Minimal Abstraction**: No additional coordinator layer needed
+βœ… **Cloud-Specific Concerns**: Session logic remains in Cloud repo
+βœ… **Test Integration**: Full TestScheduler features available for Cloud testing
+βœ… **Deterministic Simulation**: Session-aware timing and task ordering
 
-## Task<T> Definition
+## Scheduler Trait Definition
 
 ```rust
-#[derive(Debug)]
-pub struct Task<T> {
-    inner: TaskState<T>,
-    id: TaskId,  // Mandatory for coordination
-    metadata: TaskMetadata,
-}
+pub trait Scheduler: Send + Sync {
+    /// Schedule a runnable to be executed (object-safe core functionality)
+    fn schedule(&self, runnable: Runnable);
 
-#[derive(Debug)]
-pub struct TaskMetadata {
-    label: Option<TaskLabel>,        // GPUI test identification
-    session: Option<SessionId>,      // Cloud session association
-    spawn_location: Option<&'static std::panic::Location>,
-}
+    /// Schedule a runnable with label for test tracking
+    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct TaskId(pub usize);
+    /// Schedule a runnable on the main thread (optional, defaults to panic)
+    fn schedule_foreground(&self, runnable: Runnable) {
+        panic!("schedule_foreground not supported by this scheduler");
+    }
 
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct TaskLabel(NonZeroUsize);
-```
+    /// Schedule a runnable after a delay (object-safe for timers)
+    fn schedule_after(&self, duration: Duration, runnable: Runnable);
 
-**Explanation:**
-- Mandatory TaskId for session coordination
-- Optional metadata for GPUI labels and Cloud sessions
-- Task implements Future directly
+    /// Platform integration methods
+    fn park(&self, timeout: Option<Duration>) -> bool;
+    fn unparker(&self) -> Unparker;
+    fn is_main_thread(&self) -> bool;
+    fn now(&self) -> Instant;
+}
+```
 
 ## TestScheduler (Concrete Implementation)
 
@@ -330,7 +355,7 @@ pub struct TestScheduler {
 struct TestSchedulerInner {
     tasks: HashMap<TaskId, TaskState>,
     task_labels: HashMap<TaskId, TaskLabel>,
-    deprioritized_labels: HashSet<TaskLabel>,  // Test-specific state
+    deprioritized_labels: HashSet<TaskLabel>,
     deprioritized_queue: VecDeque<(Runnable, TaskId)>,
     main_thread_queue: VecDeque<Runnable>,
     delayed: Vec<(Instant, Runnable)>,
@@ -338,46 +363,54 @@ struct TestSchedulerInner {
     is_main_thread: bool,
     now: Instant,
     next_task_id: AtomicUsize,
-    rng: StdRng,  // Seeded random number generator for test determinism
-    waiting_tasks: HashSet<TaskId>,  // Track tasks marked as waiting
-    parking_allowed: bool,  // Control parking behavior
-    waiting_hint: Option<String>,  // Debug hint for parked state
-    block_tick_range: std::ops::RangeInclusive<usize>,  // Control block timeout behavior
+    rng: StdRng,
+    waiting_tasks: HashSet<TaskId>,
+    parking_allowed: bool,
+    waiting_hint: Option<String>,
+    block_tick_range: std::ops::RangeInclusive<usize>,
 }
 
 impl Scheduler for TestScheduler {
     fn schedule(&self, runnable: Runnable) {
-        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
+        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
         self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
 
-        // Schedule the runnable and setup completion callback
+        // Schedule completion callback
         let scheduler = self.clone();
-        let completion_runnable = self.create_completion_runnable(runnable, task_id);
+        let completion_runnable = async_task::spawn(async move {
+            runnable.run();
+            scheduler.mark_task_completed(task_id);
+        }, |_| {}).0;
+
         completion_runnable.schedule();
     }
 
     fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
-        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
+        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
 
-        // Apply deprioritization if label is registered
         if self.inner.borrow().deprioritized_labels.contains(&label) {
-            // Store label association and put in deprioritized queue
             self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id));
             self.inner.borrow_mut().task_labels.insert(task_id, label);
         } else {
             self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-            let completion_runnable = self.create_completion_runnable(runnable, task_id);
+            let completion_runnable = async_task::spawn(async move {
+                runnable.run();
+                // Mark as completed when done
+            }, |_| {}).0;
             completion_runnable.schedule();
         }
     }
 
     fn schedule_foreground(&self, runnable: Runnable) {
         assert!(self.is_main_thread(), "schedule_foreground called off main thread");
-        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
+        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
         self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
 
-        let completion_runnable = self.create_completion_runnable(runnable, task_id);
-        // Schedule on main thread queue
+        let completion_runnable = async_task::spawn(async move {
+            runnable.run();
+            // Mark as completed
+        }, |_| {}).0;
+
         self.inner.borrow_mut().main_thread_queue.push(completion_runnable);
     }
 
@@ -386,46 +419,13 @@ impl Scheduler for TestScheduler {
     fn park(&self, timeout: Option<Duration>) -> bool {
         self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
     }
-
     fn unparker(&self) -> Unparker {
         self.inner.borrow().parker.unparker()
     }
 }
 
 impl TestScheduler {
-    fn assign_task_id(&self) -> TaskId {
-        TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst))
-    }
-
-    fn mark_task_started(&self, task_id: TaskId) {
-        // Task already marked running in schedule methods
-    }
-
-    fn schedule_completion_callback(&self, runnable: Runnable, task_id: TaskId) -> Runnable {
-        let scheduler = self.clone();
-        async_task::spawn(async move {
-            // Run the original runnable
-            runnable.schedule();
-            // Mark task as completed when done
-            scheduler.mark_task_completed(task_id);
-        }, |_| {}).0
-    }
-
-    fn mark_task_completed(&self, task_id: TaskId) {
-        self.inner.borrow_mut().tasks.remove(&task_id);
-    }
-
-    fn create_completion_runnable(&self, runnable: Runnable, task_id: TaskId) -> Runnable {
-        let scheduler = self.clone();
-        async_task::spawn(async move {
-            runnable.schedule();
-            scheduler.mark_task_completed(task_id);
-        }, |_| {}).0
-    }
-}
-
-// Test-specific methods (NOT on main trait)
-impl TestScheduler {
+    // Test-specific methods (NOT on main trait)
     pub fn deprioritize(&self, label: TaskLabel) {
         self.inner.borrow_mut().deprioritized_labels.insert(label);
     }
@@ -434,495 +434,74 @@ impl TestScheduler {
         self.inner.borrow().tasks.contains_key(&task_id)
     }
 
-    // Additional internal methods for task lifecycle management
-    fn move_to_deprioritized_queue(&self, task_id: TaskId) {
-        // Move task to deprioritized queue for deterministic testing
-        // This is called from deprioritize to move already scheduled tasks
-        if let Some(runnable) = self.inner.borrow_mut().tasks.remove(&task_id) {
-            self.inner.borrow_mut().deprioritized_queue.push_back((runnable, task_id));
-        }
-    }
-
-    // GPUI Test Infrastructure Methods
-    pub fn tick(&self) -> bool {
-        // Run exactly one pending task
-        if let Some((runnable, task_id)) = self.inner.borrow_mut().deprioritized_queue.pop_front() {
-            let completion_runnable = self.create_completion_runnable(runnable, task_id);
-            completion_runnable.schedule();
-            true
-        } else if let Some(task_id) = self.inner.borrow().tasks.keys().next().cloned() {
-            // Simulate running one task by marking it complete
-            self.inner.borrow_mut().tasks.remove(&task_id);
-            true
-        } else {
-            false
-        }
-    }
-
-    pub fn run_until_parked(&self) {
-        // Run all tasks until none remain
-        while self.tick() {}
-    }
-
-    pub fn advance_clock(&self, duration: Duration) {
-        // Advance fake time for timer testing
-        self.inner.borrow_mut().now += duration;
-        // Process any delayed tasks that are now ready
-        let now = self.inner.borrow().now;
-        let mut to_schedule = Vec::new();
-
-        let mut inner = self.inner.borrow_mut();
-        inner.delayed.retain(|(time, runnable)| {
-            if *time <= now {
-                to_schedule.push(runnable.clone());
-                false
-            } else {
-                true
-            }
-        });
-
-        drop(inner);
-        for runnable in to_schedule {
-            self.schedule(runnable);
-        }
-    }
-
-    pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
-        // Simulate random delay using seeded RNG
-        let delay = Duration::from_millis(self.inner.borrow().rng.gen_range(0..10));
-        async move {
-            // This would be implemented as a timer in real system
-        }
-    }
-
-    pub fn start_waiting(&self) {
-        // GPUI test debugging - mark that current task is waiting
-        // Implementation would track waiting tasks for debugging
-    }
-
-    pub fn finish_waiting(&self) {
-        // GPUI test debugging - mark that current task finished waiting
-        // Implementation would remove waiting task tracking
-    }
-
-    pub fn allow_parking(&self) {
-        // Allow the scheduler to park when idle
-        // Implementation would modify parking behavior
-    }
-
-    pub fn forbid_parking(&self) {
-        // Prevent scheduler from parking
-        // Implementation would modify parking behavior
-    }
-
-    pub fn set_waiting_hint(&self, msg: Option<String>) {
-        // Set hint message for when scheduler is parked without tasks
-        // Implementation would store hint for debugging
-    }
-
-    pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
-        // Set range for how many ticks to run in block_with_timeout
-        // Implementation would store range for block behavior control
-    }
-
-    pub fn rng(&self) -> StdRng {
-        // Return seeded random number generator
-        self.inner.borrow().rng.clone()
-    }
-}
-
-    // Task creation now handled by generic spawn helpers
-    // Runnable scheduling managed internally by schedule methods
-}
-```
-
-**Explanation:**
-- `deprioritize()` is a TestScheduler-specific method (not on main trait)
-- `spawn_labeled()` is TestScheduler-specific (not on main trait)
-- `is_task_running()` provides task status for Cloud session validation
-- Test-specific state stays in TestScheduler
-
-## Platform Scheduler Implementations in GPUI
-
-The following example shows how existing GPUI platform dispatchers would implement the `Scheduler` trait. These implementations **remain in GPUI's platform modules**, not in the shared scheduler crate.
-
-```rust
-// crates/gpui/src/platform/mac/scheduler.rs (renamed from dispatcher.rs)
-// This should implement Scheduler directly on existing platform-specific code
-
-// Example implementation (to be added in Phase 1):
-impl Scheduler for MacDispatcher {
-    fn schedule(&self, runnable: Runnable) {
-        // Direct mapping to existing GCD implementation
-        unsafe {
-            dispatch_async_f(
-                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
-                runnable.into_raw().as_ptr() as *mut c_void,
-                Some(trampoline),
-            );
-        }
-    }
-
-    fn schedule_labeled(&self, runnable: Runnable, _label: TaskLabel) {
-        // Production scheduler ignores labels (existing behavior)
-        unsafe {
-            dispatch_async_f(
-                dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0),
-                runnable.into_raw().as_ptr() as *mut c_void,
-                Some(trampoline),
-            );
-        }
-    }
-
-    fn schedule_foreground(&self, runnable: Runnable) {
-        // Existing dispatch_on_main_thread implementation
-        unsafe {
-            dispatch_async_f(
-                dispatch_get_main_queue(),
-                runnable.into_raw().as_ptr() as *mut c_void,
-                Some(trampoline),
-            );
-        }
-    }
-
-    fn schedule_after(&self, duration: Duration, runnable: Runnable) {
-        // Existing dispatch_after implementation
-        unsafe {
-            let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH.try_into().unwrap(), 0);
-            let when = dispatch_time(DISPATCH_TIME_NOW as u64, duration.as_nanos() as i64);
-            dispatch_after_f(when, queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline));
-        }
-    }
-
-    fn is_main_thread(&self) -> bool {
-        // Existing macOS-specific main thread detection
-        unsafe {
-            let is_main_thread: BOOL = msg_send![class!(NSThread), isMainThread];
-            is_main_thread == YES
-        }
-    }
-
-    fn now(&self) -> Instant {
-        Instant::now()  // Existing implementation
-    }
-
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        // Existing parking implementation
-        if let Some(timeout) = timeout {
-            self.parker.lock().park_timeout(timeout)
-        } else {
-            self.parker.lock().park();
-            true
-        }
-    }
-
-    fn unparker(&self) -> Unparker {
-        // Existing unparker implementation
-        self.parker.lock().unparker()
-    }
+    pub fn tick(&self) -> bool { /* implementation */ }
+    pub fn run_until_parked(&self) { /* implementation */ }
+    pub fn advance_clock(&self, duration: Duration) { /* implementation */ }
 }
 ```
 
-**Key Points:**
-- Platform scheduler implementations **remain in GPUI platform modules** (e.g., `platform/mac/`, `platform/linux/`, `platform/windows/`)
-- Existing platform dispatchers implement `Scheduler` trait directly - no code needs to be moved
-- Substantial platform-specific code stays where it belongs, integrated with event loops
-- GPUI executors use trait objects: `Arc<dyn Scheduler>` pointing to platform implementations
-- Shared crate provides only trait definition + TestScheduler + generic helpers
+## Generic Spawn Helpers
 
-## GPUI Integration
+Generic spawn methods implemented for `dyn Scheduler`:
 
 ```rust
-// BackgroundExecutor uses trait objects with comprehensive method support
-pub struct BackgroundExecutor {
-    scheduler: Arc<dyn Scheduler>,  // Any Scheduler implementation via trait objects
-}
-
-impl BackgroundExecutor {
-    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
-        Self { scheduler }
-    }
-
-    // Core spawning methods via generic helpers
+impl dyn Scheduler {
     pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
     where R: Send + 'static {
-        // Generic spawn helper implemented on dyn Scheduler - full Future support
-        self.scheduler.spawn(future)
-    }
-
-    pub fn spawn_labeled<R>(
-        &self,
-        label: TaskLabel,
-        future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R>
-    where R: Send + 'static {
-        // Generic spawn_labeled helper implemented on dyn Scheduler
-        self.scheduler.spawn_labeled(label, future)
-    }
-
-    // Timer functionality via generic helper using schedule_after
-    pub fn timer(&self, duration: Duration) -> Task<()> {
-        self.scheduler.timer(duration)
-    }
-
-    // Blocking operations via generic helpers
-    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
-        self.scheduler.block(future)
-    }
-
-    pub fn block_with_timeout<R>(
-        &self,
-        future: impl Future<Output = R>,
-        timeout: Duration
-    ) -> Result<R, std::sync::mpsc::RecvTimeoutError> {
-        self.scheduler.block_with_timeout(future, timeout)
-    }
-
-    // Direct trait object methods
-    pub fn now(&self) -> Instant {
-        self.scheduler.now()
-    }
-
-    pub fn is_main_thread(&self) -> bool {
-        self.scheduler.is_main_thread()
-    }
-
-    pub fn num_cpus(&self) -> usize {
-        self.scheduler.num_cpus()
-    }
-
-    // Test-specific methods via downcast to TestScheduler
-    pub fn deprioritize(&self, label: TaskLabel) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.deprioritize(label);
-        } else {
-            // Production: ignore silently
-        }
-    }
-
-    pub fn tick(&self) -> Option<bool> {
-        self.scheduler.downcast_ref::<TestScheduler>()
-            .map(|ts| ts.tick())
-    }
-
-    pub fn run_until_parked(&self) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.run_until_parked();
-        }
-    }
+        let task_id = self.assign_task_id();
+        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
+            self.mark_task_started(task_id);
+            self.schedule_completion_callback(runnable, task_id);
+        });
 
-    pub fn advance_clock(&self, duration: Duration) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.advance_clock(duration);
-        }
+        self.schedule(runnable);
+        Task { /* ... */ }
     }
-}
 
-// ForegroundExecutor also uses trait objects with full GPUI API support
-pub struct ForegroundExecutor {
-    scheduler: Rc<dyn Scheduler>,
-}
-
-impl ForegroundExecutor {
-    // Core spawning for main thread (non-Send futures)
-    pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
+    pub fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
     where R: 'static {
-        // Generic spawn_foreground helper implemented on dyn Scheduler
-        self.scheduler.spawn_foreground(future)
-    }
-
-    // All the same methods available as BackgroundExecutor
-    pub fn timer(&self, duration: Duration) -> Task<()> {
-        self.scheduler.timer(duration)
-    }
-
-    pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
-        self.scheduler.block(future)
-    }
-
-    pub fn block_with_timeout<R>(
-        &self,
-        future: impl Future<Output = R>,
-        timeout: Duration
-    ) -> Result<R, std::sync::mpsc::RecvTimeoutError> {
-        self.scheduler.block_with_timeout(future, timeout)
-    }
-
-    pub fn now(&self) -> Instant {
-        self.scheduler.now()
-    }
-
-    pub fn is_main_thread(&self) -> bool {
-        self.scheduler.is_main_thread()
-    }
-
-    pub fn num_cpus(&self) -> usize {
-        self.scheduler.num_cpus()
-    }
-
-    // Test-specific methods via downcast (same as BackgroundExecutor)
-    pub fn deprioritize(&self, label: TaskLabel) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.deprioritize(label);
-        }
-    }
-
-    pub fn tick(&self) -> Option<bool> {
-        self.scheduler.downcast_ref::<TestScheduler>()
-            .map(|ts| ts.tick())
-    }
-
-    pub fn run_until_parked(&self) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.run_until_parked();
-        }
-    }
-
-    pub fn advance_clock(&self, duration: Duration) {
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            test_scheduler.advance_clock(duration);
-        }
-    }
-}
-
-**Explanation:**
-- GPUI executors use trait objects for production safety and object-safe `Scheduler` trait
-- Generic spawn helpers provide the familiar Future-based API on `dyn Scheduler`
-- Object-safe schedule methods allow trait object usage without downcasting for basic operations
-- Test features still require downcasting to `TestScheduler` for deprioritization
-- Production deployments can use minimal schedulers via trait objects
-- Test deployments get full test features through TestScheduler wrapper
-
-## Cloud Integration
-
-```rust
-// Cloud wrapper requires TestScheduler for session features and task tracking
-pub struct CloudSimulatedScheduler {
-    scheduler: Arc<dyn Scheduler>,  // Object-safe scheduler (usually TestScheduler)
-    inner: RefCell<CloudSimulatedSchedulerInner>,
-}
-
-struct CloudSimulatedSchedulerInner {
-    current_session: Option<SessionId>,
-    sessions: HashMap<SessionId, SessionData>,
-    task_to_session: HashMap<TaskId, SessionId>,
-}
-
-impl CloudSimulatedScheduler {
-    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
-        Self {
-            scheduler,
-            inner: RefCell::new(CloudSimulatedSchedulerInner {
-                current_session: None,
-                sessions: HashMap::new(),
-                task_to_session: HashMap::new(),
-            }),
-        }
-    }
-
-    // Use generic spawn helpers with session tracking
-    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
-    where R: Send + 'static {
-        // Get task from generic spawn helper (includes task_id assignment)
-        let task = self.scheduler.spawn(future);
-
-        // Auto-associate with current session
-        if let Some(session_id) = self.inner.borrow().current_session.clone() {
-            self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone());
-            // Track spawned task in session
-            if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
-                session.spawned_tasks.push(task.metadata.id);
-            }
-        }
+        let task_id = self.assign_task_id();
+        let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| {
+            self.mark_task_started(task_id);
+            self.schedule_completion_callback(runnable, task_id);
+        });
 
-        task
+        self.schedule_foreground(runnable);
+        Task { /* ... */ }
     }
 
-    pub fn spawn_labeled<R>(
-        &self,
-        label: TaskLabel,
-        future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R>
-    where R: Send + 'static {
-        // Use generic spawn_labeled helper
-        let task = self.scheduler.spawn_labeled(label, future);
-
-        // Auto-associate with current session
-        if let Some(session_id) = self.inner.borrow().current_session.clone() {
-            self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone());
-            // Track spawned task in session
-            if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
-                session.spawned_tasks.push(task.metadata.id);
-            }
+    pub fn timer(&self, duration: Duration) -> Task<()> {
+        if duration.is_zero() {
+            return Task::ready(());
         }
 
-        task
-    }
-
-    pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
-        // Use TestScheduler's internal task tracking for validation
-        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
-            let inner = self.inner.borrow();
-
-            if let Some(session) = inner.sessions.get(&session_id) {
-                let running_tasks: Vec<TaskId> = session
-                    .spawned_tasks
-                    .iter()
-                    .filter(|&&task_id| test_scheduler.is_task_running(task_id))
-                    .copied()
-                    .collect();
-
-                // Check against explicit wait_until permissions
-                let unauthorized = running_tasks.difference(&session.wait_until_task_ids);
-
-                if unauthorized.next().is_some() {
-                    return Err(anyhow!("Session cleanup failed: unauthorized tasks still running"));
-                }
+        let (runnable, inner_task) = async_task::spawn(async move {}, {
+            let scheduler = &*self;
+            move |runnable| {
+                scheduler.schedule_after(duration, runnable);
             }
-        } else {
-            // Production scheduler: no task tracking available
-            return Err(anyhow!("Session validation requires TestScheduler"));
-        }
-
-        Ok(())
-    }
-
-    // Session management methods
-    pub fn create_session(&self) -> SessionId {
-        let session_id = SessionId::new();
-        self.inner.borrow_mut().sessions.insert(session_id.clone(), SessionData {
-            spawned_tasks: Vec::new(),
-            wait_until_task_ids: HashSet::new(),
         });
-        self.inner.borrow_mut().current_session = Some(session_id.clone());
-        session_id
+
+        runnable.schedule();
+        Task { /* ... */ }
     }
 
-    pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
-        if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
-            session.wait_until_task_ids.insert(task_id);
-        }
+    pub fn is_task_running(&self, task_id: TaskId) -> bool {
+        // Requires downcast to TestScheduler
+        None // Default implementation
     }
 }
 ```
 
-**Explanation:**
-- Cloud wrapper uses object-safe `Scheduler` trait with generic spawn helpers
-- Internal task management: scheduler tracks running tasks, Cloud wrapper associates with sessions
-- Session tracking enhanced: tasks automatically associated via spawn helpers and metadata
-- Task lifecycle: scheduler manages completion internally, Cloud validates against running tasks
-- Test features: downcast to TestScheduler for `is_task_running()` validation
-- Production safety: uses trait objects, but session features require TestScheduler
-
 ## Migration Strategy
 
 ### Phase 1: Core Infrastructure
 1. Define `Scheduler` trait (core methods only)
 2. Implement `TestScheduler` (with test features like `deprioritize()`)
 3. Make existing GPUI platform dispatchers implement `Scheduler` trait
-   - MacDispatcher should implement `Scheduler` for GCD integration
-   - LinuxDispatcher should implement `Scheduler` for thread pools
-   - WindowsDispatcher should implement `Scheduler` for Windows ThreadPool
+   - MacDispatcher implements `Scheduler` for GCD integration
+   - LinuxDispatcher implements `Scheduler` for thread pools
+   - WindowsDispatcher implements `Scheduler` for Windows ThreadPool
 4. Define `Task<T>` with mandatory TaskId
 
 ### Phase 2: GPUI Migration