diff --git a/thoughts/2025-08-31_15-47-17_test-scheduler-design.md b/thoughts/2025-08-31_15-47-17_test-scheduler-design.md index 0acc6cfab63cfc1e1fee2ff3a7187fcdfbf66f8e..09ed09c5b05af87461c9a0d8016cc07b256ffb2d 100644 --- a/thoughts/2025-08-31_15-47-17_test-scheduler-design.md +++ b/thoughts/2025-08-31_15-47-17_test-scheduler-design.md @@ -1,24 +1,16 @@ - -zed/thoughts/2025-08-31_15-47-17_test-scheduler-design.md - - - -Updating TestScheduler to use Mutex instead of RwLock and fix lock dropping in tick_internal for single-threaded safety - - # TestScheduler Design Details -This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. +This document expands on the [Unified Scheduler Architecture Plan](2025-08-31_15-47-17_unified-scheduler-architecture.md) by providing a detailed design and complete implementation of the TestScheduler. It assumes familiarity with the broader architecture, including the shared `Scheduler` trait, domain separation (GPUI vs. Cloud), and multi-threading test scenarios. **Updates incorporate Executor and ForegroundExecutor wrappers around Arc, with ForegroundExecutor using PhantomData> for !Send and panicking if not on the creation thread, applied to both GPUI and Cloud for consistency and simplicity.** ## Overview The TestScheduler is the **single concrete test implementation** of the `Scheduler` trait (see Section 3: Scheduler Trait Definition in the original plan). It serves as the unified core for all test scenarios, enabling: - **GPUI Testing**: Deterministic UI scheduling with task labels, deprioritization, main thread isolation, and tick-based execution (see Section 4.1: GPUI Integration in the original plan). -- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). +- **Cloud Testing**: Session coordination, time-range delays, wait-until task tracking, and cleanup validation (see Section 5: Cloud Integration in the original plan). **ForegroundExecutor is now used in Cloud for single-threaded simplicity, avoiding Send requirements on futures.** - **Unified Testing**: Shared across test threads for client-cloud interactions, seeded randomness, and task lifecycle management. -There is **no separate TestScheduler trait**—all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. +There is **no separate TestScheduler trait**—all test-specific methods are directly on the TestScheduler struct for simplicity, as it is the only implementation in the test context. **Executors wrap Arc, with ForegroundExecutor enforcing thread safety via phantom Rc and creation-thread checks.** ## Design Principles @@ -26,7 +18,8 @@ There is **no separate TestScheduler trait**—all test-specific methods are dir - **Merged Capabilities**: Combines GPUI queues with Cloud session logic in a single state machine. - **Determinism**: Always seeded with configurable randomization. - **Multi-threading Ready**: `Arc` and `Mutex` for shared access in collaborative tests. -- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor). +- **Domain Wrappers**: GPUI/Cloud test code wraps this core for specific APIs (e.g., GPUI's BackgroundExecutor now uses Executor and ForegroundExecutor). +- **Thread Safety Enforcement**: ForegroundExecutor uses phantom Rc for !Send and checks against creation thread for main-thread isolation. ## Complete TestScheduler Implementation @@ -38,6 +31,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; +use std::thread; use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; @@ -179,13 +173,13 @@ struct SchedulerState { parking_allowed: AtomicBool, execution_history: Vec, fuzz_inputs: Option, + creation_thread_id: thread::ThreadId, // Added for thread safety checks } // Concrete implementation pub struct TestScheduler { state: Arc>, } -``` impl TestScheduler { /// Primary constructor: Create a scheduler from full configuration. @@ -215,10 +209,19 @@ impl TestScheduler { parking_allowed: AtomicBool::new(false), execution_history: Vec::new(), fuzz_inputs, + creation_thread_id: thread::current().id(), // Capture creation thread }; Arc::new(Self { state: Arc::new(Mutex::new(state)) }) } + // Added for ForegroundExecutor thread checks + pub fn assert_main_thread(&self) { + let state = self.state.lock().unwrap(); + if thread::current().id() != state.creation_thread_id { + panic!("ForegroundExecutor method called from wrong thread"); + } + } + /// Convenience helper: Create a seeded scheduler (randomization enabled by default). pub fn from_seed(seed: u64) -> Arc { Self::new(SchedulerConfig::seeded(seed)) @@ -378,24 +381,41 @@ impl TestScheduler { ## GPUI Usage Example -GPUI wraps the TestScheduler to maintain its existing API: +GPUI wraps the TestScheduler using Executor and ForegroundExecutor: ```rust -pub struct BackgroundExecutor { - scheduler: Arc, // TestScheduler implements Scheduler +use std::marker::PhantomData; +use std::rc::Rc; + +// Generic Executor for background tasks (Send futures) +pub struct Executor { + scheduler: Arc, } -impl BackgroundExecutor { +impl Executor { + pub fn new(scheduler: Arc) -> Self { + Self { scheduler } + } + + pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + where R: Send + 'static { + // Delegate to scheduler via downcast for test methods if applicable + if let Some(test_sched) = self.scheduler.as_any().downcast_ref::() { + // Use test_sched methods + } + self.scheduler.spawn(future) + } + pub fn spawn_labeled( - &self, + &self, label: TaskLabel, future: impl Future + Send + 'static - ) -> Task { - // Direct downcast for GPUI test features + ) -> Task + where R: Send + 'static { if let Some(test_sched) = self.scheduler.as_any().downcast_ref::() { - test_sched.deprioritize(label); // Use test method directly + test_sched.deprioritize(label); } - self.scheduler.spawn(future) + self.scheduler.spawn_labeled(label, future) } pub fn deprioritize(&self, label: TaskLabel) { @@ -403,23 +423,81 @@ impl BackgroundExecutor { test_sched.deprioritize(label); } } + + pub fn timer(&self, duration: Duration) -> Task<()> { + self.scheduler.timer(duration) + } + + pub fn tick(&self) -> Option { + self.scheduler.as_any().downcast_ref::().map(|ts| ts.tick(false)) + } +} + +// ForegroundExecutor for main-thread tasks (!Send futures, thread checks) +pub struct ForegroundExecutor { + executor: Executor, + _phantom: PhantomData>, // Enforces !Send +} + +impl !Send for ForegroundExecutor {} // Explicitly !Send + +impl ForegroundExecutor { + pub fn new(scheduler: Arc) -> Result { + let executor = Executor::new(scheduler); + // Check thread immediately via scheduler + if let Some(test_sched) = executor.scheduler.as_any().downcast_ref::() { + test_sched.assert_main_thread(); + } else { + // Production: assume created on main thread + } + Ok(Self { executor, _phantom: PhantomData }) + } + + pub fn spawn(&self, future: impl Future + 'static) -> Task + where R: 'static { + // Assert thread before delegating + if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::() { + test_sched.assert_main_thread(); + } + self.executor.scheduler.spawn_foreground(future) + } + + pub fn timer(&self, duration: Duration) -> Task<()> { + if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::() { + test_sched.assert_main_thread(); + } + self.executor.scheduler.timer(duration) + } + + // Other methods mirror Executor but with thread checks } ``` ## Cloud Usage Example -Cloud wraps for session coordination: +Cloud wraps using ForegroundExecutor for single-threaded simplicity (no Send futures required): ```rust impl SimulatedExecutionContext { + pub fn new(scheduler: Arc) -> Result { + let fg_executor = ForegroundExecutor::new(scheduler)?; // Use ForegroundExecutor for thread safety and simplicity + Self { + executor: fg_executor, + session_counter: AtomicUsize::new(0), + sessions: Mutex::new(HashMap::new()), + current_session: Mutex::new(None), + } + } + pub fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> { - let task = self.scheduler.spawn(async move { future.await })?; + let task = self.executor.spawn(async move { future.await })?; - // Direct use of TestScheduler methods - let scheduler = self.scheduler.as_any().downcast_ref::().unwrap(); - if let Some(session_id) = scheduler.get_current_session() { - scheduler.track_task_for_session(task.id(), session_id); - scheduler.add_wait_until_task(session_id, task.id()); + // Direct use of TestScheduler methods via downcast from executor + if let Some(test_sched) = self.executor.scheduler.as_any().downcast_ref::() { + if let Some(session_id) = test_sched.get_current_session() { + test_sched.track_task_for_session(task.id(), session_id); + test_sched.add_wait_until_task(session_id, task.id()); + } } Ok(()) @@ -435,7 +513,8 @@ impl SimulatedExecutionContext { - **futures**: For channels. - **chrono**: For time ranges (optional). - **anyhow**: For errors. +- **std::thread**: For thread ID comparison. The scheduler assumes no `dyn Any` is implemented on `Scheduler`; add `fn as_any(&self) -> &dyn std::any::Any;` if needed for downcasting. -This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct. \ No newline at end of file +This implementation provides the complete unified test core, enabling both GPUI's deterministic UI testing and Cloud's session-aware simulation in a single ~250-line struct, now wrapped by Executors for better encapsulation and thread safety. \ No newline at end of file diff --git a/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md b/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md index f4acfced5d133c7b524f76ad32fe0244ec470172..bc8fe93009571b98549024930712963bded6ff20 100644 --- a/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md +++ b/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md @@ -1,46 +1,27 @@ -# Unified Scheduler Architecture - Layered Design - -## Overview - -A clean layered architecture where: -- **Core**: Basic scheduling interface (`Scheduler` trait) + test-enhanced concrete impl (`TestScheduler`) -- **GPUI**: Uses trait objects for production safety, test features via `TestScheduler` -- **Cloud**: Session coordination integrated directly in `SimulatedExecutionContext` using unified scheduler primitives - -Key design principles: -- Main `Scheduler` trait has only essential methods (no test pollution) -- Test-specific features (deprioritization, task tracking) are `TestScheduler`-specific -- Production schedulers implement minimal interface -- Cloud handles session coordination at ExecutionContext layer using unified primitives - -## Core Architecture - -``` ┌─────────────────────────────────────────┐ │ Shared Crate │ ├─────────────────────────────────────────┤ │ Scheduler trait: │ │ - Core object-safe interface │ -│ - Platform integration (park, now) │ │ │ │ TestScheduler: │ │ - Should implement Scheduler + test features │ │ - deprioritize() - test-only method │ │ - spawn_labeled() - labels for testing │ │ - Task lifecycle tracking │ +│ - creation_thread_id for Foreground checks│ │ │ -│ Generic spawn helpers: │ -│ - spawn() / spawn_foreground() │ -│ - timer(), block(), block_with_timeout()│ -│ - Future-based API for trait objects │ +│ Executor wrappers: │ +│ - Executor: Wraps Arc, Send futures│ +│ - ForegroundExecutor: Wraps Arc, !Send, thread checks│ └─────────────────────────────────────────┘ ▲ ┌─────────┼─────────┐ │ │ │ ┌─────────┴────┐ ┌─┴─────────┴────┐ │ GPUI │ │ Cloud │ -│ Uses trait │ │ Session coord. │ -│ objects │ │ in ExecContext │ +│ Uses Executor│ │ ForegroundExec │ +│ + Foreground │ │ for single-thrd│ │ + TestScheduler│ │ + TestScheduler│ └──────────────┘ └─────────────────┘ ``` @@ -60,28 +41,27 @@ Platform-specific scheduler implementations **should remain in GPUI's platform m - GPUI's event loop integration (main thread messaging) - Platform-specific performance optimizations -The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. +The shared crate provides only the trait definition and generic helpers, while platform-specific dispatchers implement the `Scheduler` trait directly in GPUI. **Wrappers handle delegation and thread safety.** ### BackgroundExecutor Integration -GPUI's executors will use trait objects for scheduling: +GPUI's executors now use wrappers: ```rust // crates/gpui/src/executor.rs pub struct BackgroundExecutor { - scheduler: Arc, // Any Scheduler implementation via trait objects + executor: Executor, // Generic wrapper for background tasks (Send futures) } impl BackgroundExecutor { pub fn new(scheduler: Arc) -> Self { - Self { scheduler } + Self { executor: Executor::new(scheduler) } } - // Core spawning methods via generic helpers + // Core spawning methods via wrapper delegation pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where R: Send + 'static { - // Generic spawn helper implemented on dyn Scheduler - full Future support - self.scheduler.spawn(future) + self.executor.spawn(future) } pub fn spawn_labeled( @@ -90,50 +70,63 @@ impl BackgroundExecutor { future: impl Future + Send + 'static ) -> Task where R: Send + 'static { - // Generic spawn_labeled helper implemented on dyn Scheduler - self.scheduler.spawn_labeled(label, future) + self.executor.spawn_labeled(label, future) } - // Timer functionality via generic helper using schedule_after + // Timer functionality via wrapper pub fn timer(&self, duration: Duration) -> Task<()> { - self.scheduler.timer(duration) + self.executor.timer(duration) } - // Test-specific methods via downcast to TestScheduler + // Test-specific methods via downcast in wrapper pub fn deprioritize(&self, label: TaskLabel) { - if let Some(test_scheduler) = self.scheduler.downcast_ref::() { - test_scheduler.deprioritize(label); - } else { - // Production: ignore silently - } + self.executor.deprioritize(label); } pub fn tick(&self) -> Option { - self.scheduler.downcast_ref::() - .map(|ts| ts.tick()) + self.executor.tick() } } ``` ### ForegroundExecutor Integration +GPUI's foreground executor enforces main-thread usage: + ```rust // crates/gpui/src/executor.rs pub struct ForegroundExecutor { - scheduler: Rc, // Rc for single-threaded use + executor: Executor, // Underlying executor for delegation + _phantom: PhantomData>, // Enforces !Send + creation_thread_id: ThreadId, // Stored for checks } +impl !Send for ForegroundExecutor {} // Explicitly !Send + impl ForegroundExecutor { + pub fn new(scheduler: Arc) -> Result { + let creation_thread_id = thread::current().id(); + // Delegate creation to underlying scheduler + let _ = Executor::new(scheduler.clone()); + Ok(Self { executor: Executor::new(scheduler), _phantom: PhantomData, creation_thread_id }) + } + // Core spawning for main thread (non-Send futures) pub fn spawn(&self, future: impl Future + 'static) -> Task where R: 'static { - // Generic spawn_foreground helper implemented on dyn Scheduler - self.scheduler.spawn_foreground(future) + if thread::current().id() != self.creation_thread_id { + panic!("ForegroundExecutor called off main thread"); + } + // Delegate to scheduler.spawn_foreground via wrapper + self.executor.scheduler.spawn_foreground(future) } - // Timer and test methods same as BackgroundExecutor + // Timer and test methods same as BackgroundExecutor but with thread checks pub fn timer(&self, duration: Duration) -> Task<()> { - self.scheduler.timer(duration) + if thread::current().id() != self.creation_thread_id { + panic!("ForegroundExecutor called off main thread"); + } + self.executor.timer(duration) } } ``` @@ -142,12 +135,12 @@ impl ForegroundExecutor { ### Session Coordination in SimulatedExecutionContext -Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers: +Cloud's session coordination logic **should be handled directly within SimulatedExecutionContext**, keeping it close to the ExecutionContext trait implementation and avoiding unnecessary abstraction layers. **Uses ForegroundExecutor for single-threaded consistency and to avoid Send requirements on futures.** ```rust // crates/platform_simulator/src/platform.rs pub struct SimulatedExecutionContext { - scheduler: Arc, // Unified scheduler via composition + fg_executor: ForegroundExecutor, // Single-threaded wrapper for simplicity session_counter: AtomicUsize, sessions: Mutex>, current_session: Mutex>, @@ -156,32 +149,28 @@ pub struct SimulatedExecutionContext { #[async_trait(?Send)] impl PlatformRuntime for SimulatedExecutionContext { async fn delay(&self, duration: Duration) { - // Use unified scheduler's delay mechanism through timer - self.scheduler.timer(duration).await; + // Use wrapper's timer for delay + self.fg_executor.timer(duration).await; } } ``` ### Wait Until Implementation -Session coordination integrated directly with unified task scheduling: +Session coordination integrated directly with wrapper's task scheduling: ```rust #[async_trait(?Send)] impl ExecutionContext for SimulatedExecutionContext { fn wait_until(&self, future: LocalBoxFuture<'static, Result<()>>) -> Result<()> { - // 1. Spawn using unified scheduler - let task_id = self.scheduler.spawn(async move { - // Add delay via scheduler timer for deterministic simulation - self.scheduler.timer(Duration::from_millis(10)).await; - let _ = future.await; - })?; - - // 2. Register with session coordination (direct access) - if let Some(session_id) = *self.current_session.lock() { - if let Some(session) = self.sessions.lock().get_mut(&session_id) { - session.wait_until_task_ids.insert(task_id); - self.link_task_to_session(task_id, session_id); + // 1. Spawn using wrapper (no Send required) + let task = self.fg_executor.spawn(async move { future.await })?; + + // 2. Register with session coordination via downcast + if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::() { + if let Some(session_id) = test_sched.get_current_session() { + test_sched.track_task_for_session(task.id(), session_id); + test_sched.add_wait_until_task(session_id, task.id()); } } @@ -189,8 +178,8 @@ impl ExecutionContext for SimulatedExecutionContext { } async fn pass_through(&self) -> Result<()> { - // Use unified scheduler's timer for delay - self.scheduler.timer(Duration::from_millis(10)).await; + // Use wrapper's timer for delay + self.fg_executor.timer(Duration::from_millis(10)).await; Ok(()) } } @@ -202,13 +191,14 @@ Core session operations handled within SimulatedExecutionContext: ```rust impl SimulatedExecutionContext { - pub fn new(scheduler: Arc) -> Self { - Self { - scheduler, + pub fn new(scheduler: Arc) -> Result { + let fg_executor = ForegroundExecutor::new(scheduler)?; + Ok(Self { + fg_executor, session_counter: AtomicUsize::new(0), sessions: Mutex::new(HashMap::new()), current_session: Mutex::new(None), - } + }) } pub fn create_session(&self) -> SessionId { @@ -245,60 +235,32 @@ impl SimulatedExecutionContext { pub fn validate_session_cleanup(&self, session_id: SessionId) -> platform_api::Result<()> { let sessions = self.sessions.lock(); if let Some(session) = sessions.get(&session_id) { - // Check running tasks using unified scheduler's task tracking - let dangling_tasks: Vec = session - .spawned_tasks - .iter() - .filter(|&&task_id| self.scheduler.is_task_running(task_id)) - .copied() - .collect(); - - // Cloud-specific permission check - let unauthorized: Vec<_> = dangling_tasks - .into_iter() - .filter(|task_id| !session.wait_until_task_ids.contains(task_id)) - .collect(); - - if !unauthorized.is_empty() { - return Err(platform_api::WorkerError::Other(anyhow!( - "Session cleanup failed: {} unauthorized tasks still running", - unauthorized.len() - ))); + // Check running tasks using wrapper's TestScheduler access + if let Some(test_sched) = self.fg_executor.executor.scheduler.as_any().downcast_ref::() { + let dangling_tasks: Vec = session + .spawned_tasks + .iter() + .filter(|&&task_id| test_sched.is_task_running(task_id)) + .copied() + .collect(); + + // Cloud-specific permission check + let unauthorized: Vec<_> = dangling_tasks + .into_iter() + .filter(|task_id| !session.wait_until_task_ids.contains(task_id)) + .collect(); + + if !unauthorized.is_empty() { + return Err(platform_api::WorkerError::Other(anyhow!( + "Session cleanup failed: {} unauthorized tasks still running", + unauthorized.len() + ))); + } } } Ok(()) } - // Link tasks to sessions during spawning - fn link_task_to_session(&self, task_id: TaskId, session_id: SessionId) { - if let Some(session) = self.sessions.lock().get_mut(&session_id) { - session.spawned_tasks.insert(task_id); - } - } - - fn spawn_with_session(&self, future: Pin>>) -> TaskId { - let task_id = self.scheduler.spawn(future)?; - - // Auto-associate with current session - if let Some(session_id) = *self.current_session.lock() { - self.link_task_to_session(task_id, session_id); - } - - Ok(task_id) - } -} -``` - -### Cloud-Specific Data Structures - -```rust -// Session coordination is Cloud-specific but built on unified scheduler -pub struct WorkerSession { - spawned_tasks: HashSet, // Tracks tasks in session - wait_until_task_ids: HashSet, // Explicitly allowed background tasks -} - -impl SimulatedExecutionContext { pub fn set_current_session(&self, session_id: SessionId) { *self.current_session.lock() = Some(session_id); } @@ -309,15 +271,9 @@ impl SimulatedExecutionContext { } ``` -### Architecture Benefits +### Cloud-Specific Data Structures -✅ **Clean Composition**: Unified scheduling primitives + Cloud-specific session coordination -✅ **Unified Task Tracking**: Uses TestScheduler's `is_task_running()` for session validation -✅ **Natural Coupling**: Session coordination lives where ExecutionContext operates -✅ **Minimal Abstraction**: No additional coordinator layer needed -✅ **Cloud-Specific Concerns**: Session logic remains in Cloud repo -✅ **Test Integration**: Full TestScheduler features available for Cloud testing -✅ **Deterministic Simulation**: Session-aware timing and task ordering +Session coordination is Cloud-specific but built on unified scheduler primitives via wrappers. ## Scheduler Trait Definition @@ -343,7 +299,8 @@ pub trait Scheduler: Send + Sync { fn is_main_thread(&self) -> bool; fn now(&self) -> Instant; } -``` + + ## TestScheduler (Concrete Implementation) @@ -368,226 +325,136 @@ struct TestSchedulerInner { parking_allowed: bool, waiting_hint: Option, block_tick_range: std::ops::RangeInclusive, + creation_thread_id: ThreadId, // Added for wrapper checks } impl Scheduler for TestScheduler { fn schedule(&self, runnable: Runnable) { - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - - // Schedule completion callback - let scheduler = self.clone(); - let completion_runnable = async_task::spawn(async move { - runnable.run(); - scheduler.mark_task_completed(task_id); - }, |_| {}).0; - - completion_runnable.schedule(); + // Implementation as before } fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) { - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - - if self.inner.borrow().deprioritized_labels.contains(&label) { - self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id)); - self.inner.borrow_mut().task_labels.insert(task_id, label); - } else { - self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - let completion_runnable = async_task::spawn(async move { - runnable.run(); - // Mark as completed when done - }, |_| {}).0; - completion_runnable.schedule(); - } + // Implementation as before, with deprioritization } fn schedule_foreground(&self, runnable: Runnable) { - assert!(self.is_main_thread(), "schedule_foreground called off main thread"); - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - - let completion_runnable = async_task::spawn(async move { - runnable.run(); - // Mark as completed - }, |_| {}).0; - - self.inner.borrow_mut().main_thread_queue.push(completion_runnable); + assert!(thread::current().id() == self.inner.borrow().creation_thread_id, "schedule_foreground called off main thread"); + // Implementation as before } - fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread } - fn now(&self) -> Instant { self.inner.borrow().now } - fn park(&self, timeout: Option) -> bool { - self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX)) - } - fn unparker(&self) -> Unparker { - self.inner.borrow().parker.unparker() - } + // Other trait methods unchanged } impl TestScheduler { // Test-specific methods (NOT on main trait) - pub fn deprioritize(&self, label: TaskLabel) { - self.inner.borrow_mut().deprioritized_labels.insert(label); - } - - pub fn is_task_running(&self, task_id: TaskId) -> bool { - self.inner.borrow().tasks.contains_key(&task_id) - } - + pub fn deprioritize(&self, label: TaskLabel) { /* implementation */ } + pub fn is_task_running(&self, task_id: TaskId) -> bool { /* implementation */ } pub fn tick(&self) -> bool { /* implementation */ } pub fn run_until_parked(&self) { /* implementation */ } pub fn advance_clock(&self, duration: Duration) { /* implementation */ } + pub fn assert_main_thread(&self) { /* implementation */ } } ``` ## Generic Spawn Helpers -Generic spawn methods implemented for `dyn Scheduler`: - -```rust -impl dyn Scheduler { - pub fn spawn(&self, future: impl Future + Send + 'static) -> Task - where R: Send + 'static { - let task_id = self.assign_task_id(); - let (runnable, inner_task) = async_task::spawn(future, move |runnable| { - self.mark_task_started(task_id); - self.schedule_completion_callback(runnable, task_id); - }); - - self.schedule(runnable); - Task { /* ... */ } - } - - pub fn spawn_foreground(&self, future: impl Future + 'static) -> Task - where R: 'static { - let task_id = self.assign_task_id(); - let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| { - self.mark_task_started(task_id); - self.schedule_completion_callback(runnable, task_id); - }); - - self.schedule_foreground(runnable); - Task { /* ... */ } - } - - pub fn timer(&self, duration: Duration) -> Task<()> { - if duration.is_zero() { - return Task::ready(()); - } - - let (runnable, inner_task) = async_task::spawn(async move {}, { - let scheduler = &*self; - move |runnable| { - scheduler.schedule_after(duration, runnable); - } - }); - - runnable.schedule(); - Task { /* ... */ } - } - - pub fn is_task_running(&self, task_id: TaskId) -> bool { - // Requires downcast to TestScheduler - None // Default implementation - } -} -``` +Generic spawn methods implemented for `dyn Scheduler`, now called by wrappers. ## Migration Strategy ### Phase 1: Core Infrastructure 1. Define `Scheduler` trait (core methods only) -2. Implement `TestScheduler` (with test features like `deprioritize()`) -3. Make existing GPUI platform dispatchers implement `Scheduler` trait - - MacDispatcher implements `Scheduler` for GCD integration - - LinuxDispatcher implements `Scheduler` for thread pools - - WindowsDispatcher implements `Scheduler` for Windows ThreadPool -4. Define `Task` with mandatory TaskId +2. Implement `TestScheduler` with thread ID tracking +3. Add wrapper structs `Executor` and `ForegroundExecutor` +4. Make existing GPUI platform dispatchers implement `Scheduler` trait +5. Add `as_any()` to `Scheduler` for downcasting ### Phase 2: GPUI Migration -1. Update GPUI executors to use trait objects -2. Add downcasting for test features +1. Update GPUI executors to use `Executor` and `ForegroundExecutor` wrappers +2. Handle downcasting in wrappers for test features 3. Preserve all existing GPUI functionality 4. Test deployments use TestScheduler, production uses minimal schedulers ### Phase 3: Cloud Integration -1. Update `SimulatedExecutionContext` to use `Arc` +1. Update `SimulatedExecutionContext` to use `ForegroundExecutor` 2. Move session coordination logic into `SimulatedExecutionContext` -3. Integrate `wait_until()` with unified task scheduling -4. Use TestScheduler features for session validation +3. Integrate `wait_until()` with wrapper scheduling +4. Use TestScheduler features for session validation via downcast 5. Preserve all existing Cloud platform APIs ### Phase 4: Testing & Validation 1. GPUI tests work with new architecture -2. Cloud session behavior preserved +2. Cloud session behavior preserved (single-threaded) 3. Production efficiency maintained 4. Both domains benefit from unified test infrastructure ## Platform Backend Files ### GPUI Backends -- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` should implement `Scheduler` trait -- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` should implement `Scheduler` trait -- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` should implement `Scheduler` trait +- `crates/gpui/src/platform/mac/dispatcher.rs` - `MacDispatcher` implements `Scheduler` +- `crates/gpui/src/platform/linux/dispatcher.rs` - `LinuxDispatcher` implements `Scheduler` +- `crates/gpui/src/platform/windows/dispatcher.rs` - `WindowsDispatcher` implements `Scheduler` - `crates/gpui/src/platform/test/dispatcher.rs` - `TestDispatcher` → `TestScheduler` (moved to shared crate) ### Cloud Backends -- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` should contain `Scheduler` + session coordination -- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using Scheduler +- `crates/platform_simulator/src/platform.rs` - `SimulatedExecutionContext` uses `ForegroundExecutor` +- `crates/cloudflare_platform/src/execution_context.rs` - Cloudflare-specific ExecutionContext using `ForegroundExecutor` ## Compatibility Checklist ## Complete GPUI + Cloud Feature Coverage ✅ ### GPUI Compatibility -- ✅ `spawn()` → `dyn Scheduler::spawn()` (generic helper on trait object) -- ✅ `spawn_labeled(label)` → `dyn Scheduler::spawn_labeled()` (generic helper on trait object) -- ✅ `timer(duration)` → `dyn Scheduler::timer()` (generic helper using schedule_after) -- ✅ `block(future)` → `dyn Scheduler::block()` (generic helper with parking) -- ✅ `block_with_timeout(future, timeout)` → `dyn Scheduler::block_with_timeout()` (generic helper) -- ✅ `now()` → `scheduler.now()` (direct trait object method) -- ✅ `is_main_thread()` → `scheduler.is_main_thread()` (direct trait object method) -- ✅ `num_cpus()` → `dyn Scheduler::num_cpus()` (generic helper) -- ✅ `deprioritize(label)` → Downcast to TestScheduler, then TestScheduler::deprioritize() -- ✅ `tick()` → Downcast to TestScheduler, then TestScheduler::tick() -- ✅ `run_until_parked()` → Downcast to TestScheduler, then TestScheduler::run_until_parked() -- ✅ `advance_clock(duration)` → Downcast to TestScheduler, then TestScheduler::advance_clock() -- ✅ `simulate_random_delay()` → Downcast to TestScheduler, then TestScheduler::simulate_random_delay() -- ✅ `BackgroundExecutor` → Trait object wrapper using `dyn Scheduler` +- ✅ `spawn()` → `Executor::spawn()` or `ForegroundExecutor::spawn()` +- ✅ `spawn_labeled(label)` → Wrappers delegate to `dyn Scheduler::spawn_labeled()` +- ✅ `timer(duration)` → Wrappers delegate to `dyn Scheduler::timer()` +- ✅ `block(future)` → Wrappers handle with parking +- ✅ `block_with_timeout(future, timeout)` → Wrappers handle +- ✅ `now()` → `scheduler.now()` (direct trait method) +- ✅ `is_main_thread()` → `scheduler.is_main_thread()` (direct trait method) +- ✅ `num_cpus()` → Generic helper on wrappers +- ✅ `deprioritize(label)` → Downcast in wrappers, then TestScheduler::deprioritize() +- ✅ `tick()` → Downcast in wrappers, then TestScheduler::tick() +- ✅ `run_until_parked()` → Downcast in wrappers, then TestScheduler::run_until_parked() +- ✅ `advance_clock(duration)` → Downcast in wrappers, then TestScheduler::advance_clock() +- ✅ `simulate_random_delay()` → Downcast in wrappers, then TestScheduler::simulate_random_delay() +- ✅ `BackgroundExecutor` → Uses `Executor` wrapper ### Cloud Compatibility -- ✅ **Session Coordination**: `ExecutionContext.wait_until()` with direct session integration -- ✅ **Task Lifecycle**: Uses unified scheduler's `is_task_running()` for validation +- ✅ **Session Coordination**: `ExecutionContext.wait_until()` via `ForegroundExecutor` +- ✅ **Task Lifecycle**: Uses wrapper's TestScheduler access for validation - ✅ **Worker Management**: Session context and cleanup validation - ✅ **Background Tasks**: Explicit permission system for long-running work - ✅ **Deterministic Testing**: Full TestScheduler integration with session tracking -- ✅ **Platform Runtime**: `PlatformRuntime.delay()` via unified scheduler timer +- ✅ **Platform Runtime**: `PlatformRuntime.delay()` via wrapper timer - ✅ **Session Validation**: Dangling task detection with proper error reporting - ✅ **Auto-Association**: Tasks automatically linked to sessions during spawn ### Unified Benefits - ✅ **Clean Separation**: GPUI gets deprioritization, Cloud gets session coordination -- ✅ **Unified Task Tracking**: Both domains use `TestScheduler.is_task_running()` for validation +- ✅ **Unified Task Tracking**: Both domains use TestScheduler via wrappers for validation - ✅ **Composability**: Session coordination built on unified scheduling primitives - ✅ **Domain-Specific**: Each domain handles its coordination concerns appropriately - ✅ **Test Infrastructure**: Shared deterministic testing capabilities - ✅ **Production Ready**: Both domains can use minimal platform schedulers - ✅ **Extensible**: New coordination patterns can be added without shared crate changes +- ✅ **Thread Safety**: ForegroundExecutor enforces main-thread use across domains ## Implementation Notes ### Key Design Decisions -1. **GPUI**: Uses task labels for deterministic UI testing -2. **Cloud**: Uses session coordination for worker lifecycle management -3. **Shared**: Core scheduling primitives + TestScheduler for task tracking -4. **Integration**: Both domains use composition with unified scheduler +1. **GPUI**: Uses `Executor` for background (Send), `ForegroundExecutor` for main-thread (!Send) +2. **Cloud**: Uses `ForegroundExecutor` for single-threaded simplicity (no Send required on futures) +3. **Shared**: Core scheduling primitives + wrappers for delegation and safety +4. **Integration**: Both domains use wrappers with consistent API ### Migration Considerations -- **Zero Breaking Changes**: Existing APIs preserved via generic helpers +- **Zero Breaking Changes**: Existing APIs preserved via wrappers - **Gradual Migration**: Can migrate GPUI and Cloud independently - **Test Preservation**: All existing test functionality maintained - **Performance**: Minimal overhead from trait objects in production +- **Cloud Simplification**: ForegroundExecutor allows non-Send futures in single-threaded context -This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure. +This architecture provides clean separation between GPUI's UI determinism needs and Cloud's session coordination requirements, while sharing the core task scheduling infrastructure and enforcing thread safety through wrappers. \ No newline at end of file