Update unified scheduler architecture to use object-safe trait with generic spawn helpers

Nathan Sobo created

- Make Scheduler trait object-safe by replacing generic spawn<R>() with schedule(runnable)
- Add generic spawn helpers on dyn Scheduler to preserve Future-based API
- Move task state management internally to scheduler implementations
- Update GPUI integration to use trait objects with spawn helpers
- Enhance Cloud integration with automatic session association
- Resolve Runnable vs Future interface mismatch while maintaining backwards compatibility

Change summary

thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md | 499 ++-
1 file changed, 329 insertions(+), 170 deletions(-)

Detailed changes

thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md 🔗

@@ -47,24 +47,15 @@ Key design principles:
 
 ```rust
 pub trait Scheduler: Send + Sync {
-    /// Spawn Send future (core functionality)
-    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>;
+    /// Schedule a runnable to be executed (object-safe core functionality)
+    fn schedule(&self, runnable: Runnable);
 
-    /// Spawn Send future with label (defaults to ignoring label)
-    fn spawn_labeled<R>(
-        &self,
-        _label: TaskLabel,
-        future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R>
-    where R: Send + 'static {
-        // Default: ignore label and just spawn normally
-        self.spawn(future)
-    }
+    /// Schedule a runnable with label for test tracking
+    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel);
 
-    /// Spawn non-Send future on main thread (optional, defaults to panic)
-    fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
-    where R: 'static {
-        panic!("spawn_foreground not supported by this scheduler");
+    /// Schedule a runnable on the main thread (optional, defaults to panic)
+    fn schedule_foreground(&self, runnable: Runnable) {
+        panic!("schedule_foreground not supported by this scheduler");
     }
 
     /// Platform integration methods
@@ -75,6 +66,100 @@ pub trait Scheduler: Send + Sync {
 }
 ```
 
+**Explanation:**
+- Core trait methods are object-safe (no generic parameters)
+- `schedule` methods operate on `Runnable` for low-level execution control
+- Scheduler implementations manage task state internally when scheduling runnables
+- No task completion hooks needed on `Task` - scheduler tracks running tasks itself
+
+## Generic Spawn Helpers
+
+Generic spawn methods are implemented for `dyn Scheduler` to provide the high-level `Future` interface:
+
+```rust
+impl dyn Scheduler {
+    /// Spawn Send future (generic helper)
+    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
+    where R: Send + 'static {
+        let task_id = self.assign_task_id();
+        let task_metadata = TaskMetadata {
+            id: task_id,
+            label: None,
+            session: None,
+            spawn_location: std::panic::Location::caller(),
+        };
+
+        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
+            // Scheduler manages task lifecycle: mark as started when scheduled
+            self.mark_task_started(task_id);
+            // When runnable completes, scheduler marks as finished
+            self.schedule_completion_callback(runnable, task_id);
+        });
+
+        // Schedule the runnable (this adds to running tasks)
+        self.schedule(runnable);
+
+        Task {
+            inner: TaskState::Spawned(inner_task),
+            metadata: task_metadata,
+        }
+    }
+
+    /// Spawn Send future with label (generic helper)
+    pub fn spawn_labeled<R>(
+        &self,
+        label: TaskLabel,
+        future: impl Future<Output = R> + Send + 'static
+    ) -> Task<R>
+    where R: Send + 'static {
+        let task_id = self.assign_task_id();
+        let task_metadata = TaskMetadata {
+            id: task_id,
+            label: Some(label),
+            session: None,
+            spawn_location: std::panic::Location::caller(),
+        };
+
+        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
+            self.mark_task_started(task_id);
+            self.schedule_completion_callback(runnable, task_id);
+        });
+
+        // Apply test-specific logic (e.g., deprioritization) in scheduler
+        self.schedule_labeled(runnable, label);
+
+        Task {
+            inner: TaskState::Spawned(inner_task),
+            metadata: task_metadata,
+        }
+    }
+
+    /// Spawn non-Send future on main thread (generic helper)
+    pub fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
+    where R: 'static {
+        let task_id = self.assign_task_id();
+        let task_metadata = TaskMetadata {
+            id: task_id,
+            label: None,
+            session: None,
+            spawn_location: std::panic::Location::caller(),
+        };
+
+        let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| {
+            self.mark_task_started(task_id);
+            self.schedule_completion_callback(runnable, task_id);
+        });
+
+        self.schedule_foreground(runnable);
+
+        Task {
+            inner: TaskState::Spawned(inner_task),
+            metadata: task_metadata,
+        }
+    }
+}
+```
+
 **Explanation:**
 - Core trait has only essential methods
 - No test-specific methods (deprioritize stays off main trait)
@@ -119,7 +204,10 @@ pub struct TestScheduler {
 
 struct TestSchedulerInner {
     tasks: HashMap<TaskId, TaskState>,
+    task_labels: HashMap<TaskId, TaskLabel>,
     deprioritized_labels: HashSet<TaskLabel>,  // Test-specific state
+    deprioritized_queue: VecDeque<(Runnable, TaskId)>,
+    main_thread_queue: VecDeque<Runnable>,
     delayed: Vec<(Instant, Runnable)>,
     parker: Parker,
     is_main_thread: bool,
@@ -128,42 +216,45 @@ struct TestSchedulerInner {
 }
 
 impl Scheduler for TestScheduler {
-    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        let task = self.create_task(future, task_id);
+    fn schedule(&self, runnable: Runnable) {
+        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
         self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-        task
-    }
 
-    fn spawn_foreground<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R> {
-        assert!(self.is_main_thread(), "spawn_foreground called off main thread");
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        let task = self.create_local_task(future, task_id);
-        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-        task
+        // Schedule the runnable and setup completion callback
+        let scheduler = self.clone();
+        let completion_runnable = self.create_completion_runnable(runnable, task_id);
+        completion_runnable.schedule();
     }
 
-    fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
-    fn now(&self) -> Instant { self.inner.borrow().now }
-    fn park(&self, timeout: Option<Duration>) -> bool {
-        self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
-    }
-    fn spawn_labeled<R>(
-        &self,
-        label: TaskLabel,
-        future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R>
-    where R: Send + 'static {
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        let task = self.create_task_with_label(future, task_id, label);
+    fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) {
+        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
 
         // Apply deprioritization if label is registered
         if self.inner.borrow().deprioritized_labels.contains(&label) {
-            self.move_to_deprioritized_queue(task_id);
+            // Store label association and put in deprioritized queue
+            self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id));
+            self.inner.borrow_mut().task_labels.insert(task_id, label);
+        } else {
+            self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
+            let completion_runnable = self.create_completion_runnable(runnable, task_id);
+            completion_runnable.schedule();
         }
+    }
 
+    fn schedule_foreground(&self, runnable: Runnable) {
+        assert!(self.is_main_thread(), "schedule_foreground called off main thread");
+        let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst);
         self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-        task
+
+        let completion_runnable = self.create_completion_runnable(runnable, task_id);
+        // Schedule on main thread queue
+        self.inner.borrow_mut().main_thread_queue.push(completion_runnable);
+    }
+
+    fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread }
+    fn now(&self) -> Instant { self.inner.borrow().now }
+    fn park(&self, timeout: Option<Duration>) -> bool {
+        self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX))
     }
 
     fn unparker(&self) -> Unparker {
@@ -171,81 +262,61 @@ impl Scheduler for TestScheduler {
     }
 }
 
-// Test-specific methods (NOT on main trait)
 impl TestScheduler {
-    pub fn deprioritize(&self, label: TaskLabel) {
-        self.inner.borrow_mut().deprioritized_labels.insert(label);
+    fn assign_task_id(&self) -> TaskId {
+        TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst))
     }
 
-    pub fn spawn_labeled<R>(
-        &self,
-        label: TaskLabel,
-        future: impl Future<Output = R> + Send + 'static
-    ) -> Task<R> {
-        let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst));
-        let mut task = self.create_task(future, task_id);
-        task.metadata.label = Some(label);  // Set label in metadata
-
-        // Apply deprioritization if label is registered
-        if self.inner.borrow().deprioritized_labels.contains(&label) {
-            self.move_to_deprioritized_queue(task_id);
-        }
-
-        self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running);
-        task
+    fn mark_task_started(&self, task_id: TaskId) {
+        // Task already marked running in schedule methods
     }
 
-    pub fn is_task_running(&self, task_id: TaskId) -> bool {
-        self.inner.borrow().tasks.contains_key(&task_id)
+    fn schedule_completion_callback(&self, runnable: Runnable, task_id: TaskId) -> Runnable {
+        let scheduler = self.clone();
+        async_task::spawn(async move {
+            // Run the original runnable
+            runnable.schedule();
+            // Mark task as completed when done
+            scheduler.mark_task_completed(task_id);
+        }, |_| {}).0
     }
 
-    fn create_task<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId) -> Task<R> {
-        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
-            // Schedule to appropriate queue based on label
-            self.schedule_runnable(runnable, task_id);
-        });
-        runnable.schedule();
+    fn mark_task_completed(&self, task_id: TaskId) {
+        self.inner.borrow_mut().tasks.remove(&task_id);
+    }
 
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            id: task_id,
-            metadata: TaskMetadata {
-                label: None,
-                session: None,
-                spawn_location: Some(std::panic::Location::caller()),
-            },
-        }
+    fn create_completion_runnable(&self, runnable: Runnable, task_id: TaskId) -> Runnable {
+        let scheduler = self.clone();
+        async_task::spawn(async move {
+            runnable.schedule();
+            scheduler.mark_task_completed(task_id);
+        }, |_| {}).0
     }
+}
 
-    fn create_task_with_label<R>(&self, future: impl Future<Output = R> + Send + 'static, task_id: TaskId, label: TaskLabel) -> Task<R>
-    where R: Send + 'static {
-        let (runnable, inner_task) = async_task::spawn(future, move |runnable| {
-            self.schedule_runnable_with_label(runnable, task_id, label);
-        });
-        runnable.schedule();
+// Test-specific methods (NOT on main trait)
+impl TestScheduler {
+    pub fn deprioritize(&self, label: TaskLabel) {
+        self.inner.borrow_mut().deprioritized_labels.insert(label);
+    }
 
-        Task {
-            inner: TaskState::Spawned(inner_task),
-            id: task_id,
-            metadata: TaskMetadata {
-                label: Some(label),
-                session: None,
-                spawn_location: Some(std::panic::Location::caller()),
-            },
-        }
+    pub fn is_task_running(&self, task_id: TaskId) -> bool {
+        self.inner.borrow().tasks.contains_key(&task_id)
     }
 
-    fn schedule_runnable_with_label(&self, runnable: Runnable, task_id: TaskId, label: TaskLabel) {
-        // TestScheduler-specific scheduling logic for labeled tasks
-        if self.inner.borrow().deprioritized_labels.contains(&label) {
-            // Put in deprioritized queue for test determinism
-            self.inner.borrow_mut().deprioritized_queue.push(runnable);
-        } else {
-            // Schedule normally
-            runnable.schedule();
+    // Additional internal methods for task lifecycle management
+    fn move_to_deprioritized_queue(&self, task_id: TaskId) {
+        // Move task to deprioritized queue for deterministic testing
+        // This is called from deprioritize to move already scheduled tasks
+        if let Some(runnable) = self.inner.borrow_mut().tasks.remove(&task_id) {
+            self.inner.borrow_mut().deprioritized_queue.push_back((runnable, task_id));
         }
     }
 }
+
+    // Task creation now handled by generic spawn helpers
+    // Runnable scheduling managed internally by schedule methods
+}
 ```
 
 **Explanation:**
@@ -260,21 +331,25 @@ impl TestScheduler {
 pub struct GcdScheduler {
     main_queue: dispatch_queue_t,
     background_queue: dispatch_queue_t,
-    task_counter: AtomicUsize,
 }
 
 impl Scheduler for GcdScheduler {
-    fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
-        let task_id = TaskId(self.task_counter.fetch_add(1, Ordering::SeqCst));
-        let (runnable, task) = async_task::spawn(future, move |runnable| {
-            unsafe { dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); }
-        });
-        runnable.schedule();
+    fn schedule(&self, runnable: Runnable) {
+        unsafe {
+            dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline));
+        }
+    }
 
-        Task {
-            inner: TaskState::Spawned(task),
-            id: task_id,
-            metadata: TaskMetadata::default(),
+    fn schedule_labeled(&self, runnable: Runnable, _label: TaskLabel) {
+        // Production scheduler ignores labels
+        unsafe {
+            dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline));
+        }
+    }
+
+    fn schedule_foreground(&self, runnable: Runnable) {
+        unsafe {
+            dispatch_async_f(self.main_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline));
         }
     }
 
@@ -288,9 +363,10 @@ impl Scheduler for GcdScheduler {
 ```
 
 **Explanation:**
-- Production schedulers implement only core `Scheduler` trait
-- No test-specific methods (deprioritize stays off main trait)
-- Minimal implementation, no task tracking overhead
+- Production schedulers implement object-safe `Scheduler` trait
+- No test-specific features or task state tracking
+- Minimal implementation with direct dispatch to GCD queues
+- Test features only available via `TestScheduler` wrapper in GPUI
 
 ## GPUI Integration
 
@@ -307,6 +383,7 @@ impl BackgroundExecutor {
 
     pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
     where R: Send + 'static {
+        // Generic spawn helper implemented on dyn Scheduler
         self.scheduler.spawn(future)
     }
 
@@ -316,6 +393,7 @@ impl BackgroundExecutor {
         future: impl Future<Output = R> + Send + 'static
     ) -> Task<R>
     where R: Send + 'static {
+        // Generic spawn_labeled helper implemented on dyn Scheduler
         self.scheduler.spawn_labeled(label, future)
     }
 
@@ -325,7 +403,7 @@ impl BackgroundExecutor {
         if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
             test_scheduler.deprioritize(label);
         } else {
-            // Production: do nothing
+            // Production: do nothing (ignore test-only calls)
         }
     }
 }
@@ -338,23 +416,25 @@ pub struct ForegroundExecutor {
 impl ForegroundExecutor {
     pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
     where R: 'static {
+        // Generic spawn_foreground helper implemented on dyn Scheduler
         self.scheduler.spawn_foreground(future)
     }
 }
-```
 
 **Explanation:**
-- GPUI executors use trait objects for production safety
-- Test features accessed via downcasting to TestScheduler
-- Production deployments can use minimal schedulers
-- Test deployments get full test features
+- GPUI executors use trait objects for production safety and object-safe `Scheduler` trait
+- Generic spawn helpers provide the familiar Future-based API on `dyn Scheduler`
+- Object-safe schedule methods allow trait object usage without downcasting for basic operations
+- Test features still require downcasting to `TestScheduler` for deprioritization
+- Production deployments can use minimal schedulers via trait objects
+- Test deployments get full test features through TestScheduler wrapper
 
 ## Cloud Integration
 
 ```rust
-// Cloud wrapper requires TestScheduler for session features
+// Cloud wrapper requires TestScheduler for session features and task tracking
 pub struct CloudSimulatedScheduler {
-    test_scheduler: Arc<TestScheduler>,  // Concrete type for test features
+    scheduler: Arc<dyn Scheduler>,  // Object-safe scheduler (usually TestScheduler)
     inner: RefCell<CloudSimulatedSchedulerInner>,
 }
 
@@ -365,48 +445,110 @@ struct CloudSimulatedSchedulerInner {
 }
 
 impl CloudSimulatedScheduler {
-    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R> {
-        let task = self.test_scheduler.spawn(future);
+    pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
+        Self {
+            scheduler,
+            inner: RefCell::new(CloudSimulatedSchedulerInner {
+                current_session: None,
+                sessions: HashMap::new(),
+                task_to_session: HashMap::new(),
+            }),
+        }
+    }
+
+    // Use generic spawn helpers with session tracking
+    pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
+    where R: Send + 'static {
+        // Get task from generic spawn helper (includes task_id assignment)
+        let task = self.scheduler.spawn(future);
 
         // Auto-associate with current session
-        if let Some(session_id) = self.inner.borrow().current_session {
-            self.inner.borrow_mut().task_to_session.insert(task.id(), session_id);
-            // Track in session...
+        if let Some(session_id) = self.inner.borrow().current_session.clone() {
+            self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone());
+            // Track spawned task in session
+            if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
+                session.spawned_tasks.push(task.metadata.id);
+            }
+        }
+
+        task
+    }
+
+    pub fn spawn_labeled<R>(
+        &self,
+        label: TaskLabel,
+        future: impl Future<Output = R> + Send + 'static
+    ) -> Task<R>
+    where R: Send + 'static {
+        // Use generic spawn_labeled helper
+        let task = self.scheduler.spawn_labeled(label, future);
+
+        // Auto-associate with current session
+        if let Some(session_id) = self.inner.borrow().current_session.clone() {
+            self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone());
+            // Track spawned task in session
+            if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
+                session.spawned_tasks.push(task.metadata.id);
+            }
         }
 
         task
     }
 
     pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> {
-        // Use TestScheduler's task tracking for validation
-        let inner = self.inner.borrow();
-
-        if let Some(session) = inner.sessions.get(&session_id) {
-            let running_tasks: Vec<TaskId> = session
-                .spawned_tasks
-                .iter()
-                .filter(|&&task_id| self.test_scheduler.is_task_running(task_id))
-                .copied()
-                .collect();
-
-            // Check against explicit wait_until permissions
-            let unauthorized = running_tasks.difference(&session.wait_until_task_ids);
-
-            if unauthorized.next().is_some() {
-                return Err(anyhow!("Session cleanup failed: unauthorized tasks still running"));
+        // Use TestScheduler's internal task tracking for validation
+        if let Some(test_scheduler) = self.scheduler.downcast_ref::<TestScheduler>() {
+            let inner = self.inner.borrow();
+
+            if let Some(session) = inner.sessions.get(&session_id) {
+                let running_tasks: Vec<TaskId> = session
+                    .spawned_tasks
+                    .iter()
+                    .filter(|&&task_id| test_scheduler.is_task_running(task_id))
+                    .copied()
+                    .collect();
+
+                // Check against explicit wait_until permissions
+                let unauthorized = running_tasks.difference(&session.wait_until_task_ids);
+
+                if unauthorized.next().is_some() {
+                    return Err(anyhow!("Session cleanup failed: unauthorized tasks still running"));
+                }
             }
+        } else {
+            // Production scheduler: no task tracking available
+            return Err(anyhow!("Session validation requires TestScheduler"));
         }
 
         Ok(())
     }
+
+    // Session management methods
+    pub fn create_session(&self) -> SessionId {
+        let session_id = SessionId::new();
+        self.inner.borrow_mut().sessions.insert(session_id.clone(), SessionData {
+            spawned_tasks: Vec::new(),
+            wait_until_task_ids: HashSet::new(),
+        });
+        self.inner.borrow_mut().current_session = Some(session_id.clone());
+        session_id
+    }
+
+    pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) {
+        if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) {
+            session.wait_until_task_ids.insert(task_id);
+        }
+    }
 }
 ```
 
 **Explanation:**
-- Cloud requires `TestScheduler` because session features need task tracking
-- Auto-associates tasks with current session
-- Uses TestScheduler's `is_task_running()` for validation
-- Session coordination is test-focused infrastructure
+- Cloud wrapper uses object-safe `Scheduler` trait with generic spawn helpers
+- Internal task management: scheduler tracks running tasks, Cloud wrapper associates with sessions
+- Session tracking enhanced: tasks automatically associated via spawn helpers and metadata
+- Task lifecycle: scheduler manages completion internally, Cloud validates against running tasks
+- Test features: downcast to TestScheduler for `is_task_running()` validation
+- Production safety: uses trait objects, but session features require TestScheduler
 
 ## Migration Strategy
 
@@ -435,13 +577,28 @@ impl CloudSimulatedScheduler {
 
 ## Benefits
 
-✅ **Clean Separation**: Test methods only on TestScheduler
-✅ **Production Safety**: GPUI executors use trait objects
-✅ **Session Intelligence**: Cloud gets full coordination features
-✅ **Flexible Architecture**: Production vs test deployments
-✅ **Backward Compatibility**: All existing functionality preserved
+✅ **Object-Safe Trait**: Scheduler trait is object-safe, enabling trait objects without downcasting for core operations
+✅ **Internal Task Management**: Scheduler manages task lifecycle and completion state internally, providing unified task tracking
+✅ **Clean Separation**: Test methods only on TestScheduler, generic spawn helpers on trait objects
+✅ **Production Safety**: GPUI executors use trait objects with minimal dyn dispatch overhead
+✅ **Session Intelligence**: Cloud gets full coordination features with automatic task-session association via spawn helpers
+✅ **Flexible Architecture**: Production vs test deployments with scheduler implementations optimized for each context
+✅ **Backward Compatibility**: All existing functionality preserved via generic spawn helpers on `dyn Scheduler`
+
+This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities through internal scheduler task management.
+
+### Benefits of This Approach
+
+✅ **Interface Compatibility**: GPUI code continues using Future-based spawn, Cloud uses session-aware wrappers
+✅ **Performance**: Trait objects have minimal overhead, direct Runnable scheduling in production
+✅ **Separation of Concerns**: Low-level Runnable scheduling in trait, high-level Future API as helpers
+✅ **Object Safety**: Enables `Arc<dyn Scheduler>` usage without runtime downcasting for basic operations
+
+### Migration Impact
 
-This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities.
+- GPUI executors: Simple switch from `Arc<dyn PlatformDispatcher>` to `Arc<dyn Scheduler>`
+- Cloud wrapper: Enhanced to automatically associate tasks with sessions via spawn helpers
+- Test infrastructure: TestScheduler provides both low-level scheduling and task tracking internally
 
 ## Implementation Reference
 
@@ -483,14 +640,16 @@ This design keeps test concerns in TestScheduler while maintaining production sa
 ### Migration Points
 
 **GPUI Areas:**
-- Replace `cx.background_executor()` calls with new executors
-- Update any direct `PlatformDispatcher` usage
-- Preserve `spawn_labeled()` and `deprioritize()` APIs
+- Update GPUI executors to use `Arc<dyn Scheduler>` trait objects
+- Replace `PlatformDispatcher` usage with object-safe `Scheduler` methods and generic spawn helpers
+- Preserve `spawn_labeled()` and `deprioritize()` APIs via generic helpers and downcasting
+- Update `BackgroundExecutor` and `ForegroundExecutor` to call `dyn Scheduler` spawn helpers
 
 **Cloud Areas:**
-- Replace `SimulatorRuntime` usage in tests
-- Update session management to use new scheduler wrapper
-- Preserve `wait_until()` and session validation behavior
+- Replace `SimulatorRuntime` with `CloudSimulatedScheduler` wrapper around `dyn Scheduler`
+- Implement session management using wrapper's spawn helpers with automatic task association
+- Preserve `wait_until()` and session validation via downcast to TestScheduler for task tracking
+- Update `ExecutionContext` implementation to use new wrapper
 
 ### Test Files Impacted
 
@@ -521,18 +680,18 @@ This design keeps test concerns in TestScheduler while maintaining production sa
 ## Compatibility Checklist
 
 ### GPUI Compatibility
-- ✅ `spawn()` → `scheduler.spawn()`
-- ✅ `spawn_labeled(label)` → `scheduler.spawn_labeled(label)`
-- ✅ `deprioritize()` → Downcast to TestScheduler
-- ✅ `timer()` → `scheduler.timer()`
-- ✅ `BackgroundExecutor` → Trait object wrapper
+- ✅ `spawn()` → `dyn Scheduler::spawn()` (generic helper on trait object)
+- ✅ `spawn_labeled(label)` → `dyn Scheduler::spawn_labeled()` (generic helper on trait object)
+- ✅ `deprioritize()` → Downcast to TestScheduler, then `TestScheduler::deprioritize()`
+- ✅ `timer()` → `scheduler.timer()` (platform method on trait object)
+- ✅ `BackgroundExecutor` → Trait object wrapper using `dyn Scheduler`
 
 ### Cloud Compatibility
-- ✅ `ExecutionContext::wait_until()` → Scheduler wrapper
-- ✅ Session validation → `validate_session_cleanup()`
-- ✅ Automatic session association → Wrapper intelligence
-- ✅ Task cleanup checking → TestScheduler task tracking
-- ✅ `spawn()` in sessions → Auto-association
+- ✅ `ExecutionContext::wait_until()` → Scheduler wrapper with generic spawn helpers
+- ✅ Session validation → `validate_session_cleanup()` with downcast to TestScheduler
+- ✅ Automatic session association → Via spawn helpers and task metadata
+- ✅ Task cleanup checking → Internal scheduler task tracking (downcast to TestScheduler for running status)
+- ✅ `spawn()` in sessions → `dyn Scheduler::spawn()` with auto-association in wrapper
 
 ### Test Compatibility
 - ✅ Test determinism → TestScheduler deprioritization