diff --git a/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md b/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md index 26c7c640ac3679ea6e0ecbb5d12a731f72bfb527..5c065ba620e7aa1e6bbf1a425e989277b2c608dc 100644 --- a/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md +++ b/thoughts/2025-08-31_15-47-17_unified-scheduler-architecture.md @@ -47,24 +47,15 @@ Key design principles: ```rust pub trait Scheduler: Send + Sync { - /// Spawn Send future (core functionality) - fn spawn(&self, future: impl Future + Send + 'static) -> Task; + /// Schedule a runnable to be executed (object-safe core functionality) + fn schedule(&self, runnable: Runnable); - /// Spawn Send future with label (defaults to ignoring label) - fn spawn_labeled( - &self, - _label: TaskLabel, - future: impl Future + Send + 'static - ) -> Task - where R: Send + 'static { - // Default: ignore label and just spawn normally - self.spawn(future) - } + /// Schedule a runnable with label for test tracking + fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel); - /// Spawn non-Send future on main thread (optional, defaults to panic) - fn spawn_foreground(&self, future: impl Future + 'static) -> Task - where R: 'static { - panic!("spawn_foreground not supported by this scheduler"); + /// Schedule a runnable on the main thread (optional, defaults to panic) + fn schedule_foreground(&self, runnable: Runnable) { + panic!("schedule_foreground not supported by this scheduler"); } /// Platform integration methods @@ -75,6 +66,100 @@ pub trait Scheduler: Send + Sync { } ``` +**Explanation:** +- Core trait methods are object-safe (no generic parameters) +- `schedule` methods operate on `Runnable` for low-level execution control +- Scheduler implementations manage task state internally when scheduling runnables +- No task completion hooks needed on `Task` - scheduler tracks running tasks itself + +## Generic Spawn Helpers + +Generic spawn methods are implemented for `dyn Scheduler` to provide the high-level `Future` interface: + +```rust +impl dyn Scheduler { + /// Spawn Send future (generic helper) + pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + where R: Send + 'static { + let task_id = self.assign_task_id(); + let task_metadata = TaskMetadata { + id: task_id, + label: None, + session: None, + spawn_location: std::panic::Location::caller(), + }; + + let (runnable, inner_task) = async_task::spawn(future, move |runnable| { + // Scheduler manages task lifecycle: mark as started when scheduled + self.mark_task_started(task_id); + // When runnable completes, scheduler marks as finished + self.schedule_completion_callback(runnable, task_id); + }); + + // Schedule the runnable (this adds to running tasks) + self.schedule(runnable); + + Task { + inner: TaskState::Spawned(inner_task), + metadata: task_metadata, + } + } + + /// Spawn Send future with label (generic helper) + pub fn spawn_labeled( + &self, + label: TaskLabel, + future: impl Future + Send + 'static + ) -> Task + where R: Send + 'static { + let task_id = self.assign_task_id(); + let task_metadata = TaskMetadata { + id: task_id, + label: Some(label), + session: None, + spawn_location: std::panic::Location::caller(), + }; + + let (runnable, inner_task) = async_task::spawn(future, move |runnable| { + self.mark_task_started(task_id); + self.schedule_completion_callback(runnable, task_id); + }); + + // Apply test-specific logic (e.g., deprioritization) in scheduler + self.schedule_labeled(runnable, label); + + Task { + inner: TaskState::Spawned(inner_task), + metadata: task_metadata, + } + } + + /// Spawn non-Send future on main thread (generic helper) + pub fn spawn_foreground(&self, future: impl Future + 'static) -> Task + where R: 'static { + let task_id = self.assign_task_id(); + let task_metadata = TaskMetadata { + id: task_id, + label: None, + session: None, + spawn_location: std::panic::Location::caller(), + }; + + let (runnable, inner_task) = async_task::spawn_local(future, move |runnable| { + self.mark_task_started(task_id); + self.schedule_completion_callback(runnable, task_id); + }); + + self.schedule_foreground(runnable); + + Task { + inner: TaskState::Spawned(inner_task), + metadata: task_metadata, + } + } +} +``` + **Explanation:** - Core trait has only essential methods - No test-specific methods (deprioritize stays off main trait) @@ -119,7 +204,10 @@ pub struct TestScheduler { struct TestSchedulerInner { tasks: HashMap, + task_labels: HashMap, deprioritized_labels: HashSet, // Test-specific state + deprioritized_queue: VecDeque<(Runnable, TaskId)>, + main_thread_queue: VecDeque, delayed: Vec<(Instant, Runnable)>, parker: Parker, is_main_thread: bool, @@ -128,42 +216,45 @@ struct TestSchedulerInner { } impl Scheduler for TestScheduler { - fn spawn(&self, future: impl Future + Send + 'static) -> Task { - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - let task = self.create_task(future, task_id); + fn schedule(&self, runnable: Runnable) { + let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst); self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - task - } - fn spawn_foreground(&self, future: impl Future + 'static) -> Task { - assert!(self.is_main_thread(), "spawn_foreground called off main thread"); - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - let task = self.create_local_task(future, task_id); - self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - task + // Schedule the runnable and setup completion callback + let scheduler = self.clone(); + let completion_runnable = self.create_completion_runnable(runnable, task_id); + completion_runnable.schedule(); } - fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread } - fn now(&self) -> Instant { self.inner.borrow().now } - fn park(&self, timeout: Option) -> bool { - self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX)) - } - fn spawn_labeled( - &self, - label: TaskLabel, - future: impl Future + Send + 'static - ) -> Task - where R: Send + 'static { - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - let task = self.create_task_with_label(future, task_id, label); + fn schedule_labeled(&self, runnable: Runnable, label: TaskLabel) { + let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst); // Apply deprioritization if label is registered if self.inner.borrow().deprioritized_labels.contains(&label) { - self.move_to_deprioritized_queue(task_id); + // Store label association and put in deprioritized queue + self.inner.borrow_mut().deprioritized_queue.push((runnable, task_id)); + self.inner.borrow_mut().task_labels.insert(task_id, label); + } else { + self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); + let completion_runnable = self.create_completion_runnable(runnable, task_id); + completion_runnable.schedule(); } + } + fn schedule_foreground(&self, runnable: Runnable) { + assert!(self.is_main_thread(), "schedule_foreground called off main thread"); + let task_id = self.next_task_id.fetch_add(1, Ordering::SeqCst); self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - task + + let completion_runnable = self.create_completion_runnable(runnable, task_id); + // Schedule on main thread queue + self.inner.borrow_mut().main_thread_queue.push(completion_runnable); + } + + fn is_main_thread(&self) -> bool { self.inner.borrow().is_main_thread } + fn now(&self) -> Instant { self.inner.borrow().now } + fn park(&self, timeout: Option) -> bool { + self.inner.borrow().parker.park_timeout(timeout.unwrap_or(Duration::MAX)) } fn unparker(&self) -> Unparker { @@ -171,81 +262,61 @@ impl Scheduler for TestScheduler { } } -// Test-specific methods (NOT on main trait) impl TestScheduler { - pub fn deprioritize(&self, label: TaskLabel) { - self.inner.borrow_mut().deprioritized_labels.insert(label); + fn assign_task_id(&self) -> TaskId { + TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)) } - pub fn spawn_labeled( - &self, - label: TaskLabel, - future: impl Future + Send + 'static - ) -> Task { - let task_id = TaskId(self.next_task_id.fetch_add(1, Ordering::SeqCst)); - let mut task = self.create_task(future, task_id); - task.metadata.label = Some(label); // Set label in metadata - - // Apply deprioritization if label is registered - if self.inner.borrow().deprioritized_labels.contains(&label) { - self.move_to_deprioritized_queue(task_id); - } - - self.inner.borrow_mut().tasks.insert(task_id, TaskState::Running); - task + fn mark_task_started(&self, task_id: TaskId) { + // Task already marked running in schedule methods } - pub fn is_task_running(&self, task_id: TaskId) -> bool { - self.inner.borrow().tasks.contains_key(&task_id) + fn schedule_completion_callback(&self, runnable: Runnable, task_id: TaskId) -> Runnable { + let scheduler = self.clone(); + async_task::spawn(async move { + // Run the original runnable + runnable.schedule(); + // Mark task as completed when done + scheduler.mark_task_completed(task_id); + }, |_| {}).0 } - fn create_task(&self, future: impl Future + Send + 'static, task_id: TaskId) -> Task { - let (runnable, inner_task) = async_task::spawn(future, move |runnable| { - // Schedule to appropriate queue based on label - self.schedule_runnable(runnable, task_id); - }); - runnable.schedule(); + fn mark_task_completed(&self, task_id: TaskId) { + self.inner.borrow_mut().tasks.remove(&task_id); + } - Task { - inner: TaskState::Spawned(inner_task), - id: task_id, - metadata: TaskMetadata { - label: None, - session: None, - spawn_location: Some(std::panic::Location::caller()), - }, - } + fn create_completion_runnable(&self, runnable: Runnable, task_id: TaskId) -> Runnable { + let scheduler = self.clone(); + async_task::spawn(async move { + runnable.schedule(); + scheduler.mark_task_completed(task_id); + }, |_| {}).0 } +} - fn create_task_with_label(&self, future: impl Future + Send + 'static, task_id: TaskId, label: TaskLabel) -> Task - where R: Send + 'static { - let (runnable, inner_task) = async_task::spawn(future, move |runnable| { - self.schedule_runnable_with_label(runnable, task_id, label); - }); - runnable.schedule(); +// Test-specific methods (NOT on main trait) +impl TestScheduler { + pub fn deprioritize(&self, label: TaskLabel) { + self.inner.borrow_mut().deprioritized_labels.insert(label); + } - Task { - inner: TaskState::Spawned(inner_task), - id: task_id, - metadata: TaskMetadata { - label: Some(label), - session: None, - spawn_location: Some(std::panic::Location::caller()), - }, - } + pub fn is_task_running(&self, task_id: TaskId) -> bool { + self.inner.borrow().tasks.contains_key(&task_id) } - fn schedule_runnable_with_label(&self, runnable: Runnable, task_id: TaskId, label: TaskLabel) { - // TestScheduler-specific scheduling logic for labeled tasks - if self.inner.borrow().deprioritized_labels.contains(&label) { - // Put in deprioritized queue for test determinism - self.inner.borrow_mut().deprioritized_queue.push(runnable); - } else { - // Schedule normally - runnable.schedule(); + // Additional internal methods for task lifecycle management + fn move_to_deprioritized_queue(&self, task_id: TaskId) { + // Move task to deprioritized queue for deterministic testing + // This is called from deprioritize to move already scheduled tasks + if let Some(runnable) = self.inner.borrow_mut().tasks.remove(&task_id) { + self.inner.borrow_mut().deprioritized_queue.push_back((runnable, task_id)); } } } + + // Task creation now handled by generic spawn helpers + // Runnable scheduling managed internally by schedule methods +} ``` **Explanation:** @@ -260,21 +331,25 @@ impl TestScheduler { pub struct GcdScheduler { main_queue: dispatch_queue_t, background_queue: dispatch_queue_t, - task_counter: AtomicUsize, } impl Scheduler for GcdScheduler { - fn spawn(&self, future: impl Future + Send + 'static) -> Task { - let task_id = TaskId(self.task_counter.fetch_add(1, Ordering::SeqCst)); - let (runnable, task) = async_task::spawn(future, move |runnable| { - unsafe { dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); } - }); - runnable.schedule(); + fn schedule(&self, runnable: Runnable) { + unsafe { + dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); + } + } - Task { - inner: TaskState::Spawned(task), - id: task_id, - metadata: TaskMetadata::default(), + fn schedule_labeled(&self, runnable: Runnable, _label: TaskLabel) { + // Production scheduler ignores labels + unsafe { + dispatch_async_f(self.background_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); + } + } + + fn schedule_foreground(&self, runnable: Runnable) { + unsafe { + dispatch_async_f(self.main_queue, runnable.into_raw().as_ptr() as *mut c_void, Some(trampoline)); } } @@ -288,9 +363,10 @@ impl Scheduler for GcdScheduler { ``` **Explanation:** -- Production schedulers implement only core `Scheduler` trait -- No test-specific methods (deprioritize stays off main trait) -- Minimal implementation, no task tracking overhead +- Production schedulers implement object-safe `Scheduler` trait +- No test-specific features or task state tracking +- Minimal implementation with direct dispatch to GCD queues +- Test features only available via `TestScheduler` wrapper in GPUI ## GPUI Integration @@ -307,6 +383,7 @@ impl BackgroundExecutor { pub fn spawn(&self, future: impl Future + Send + 'static) -> Task where R: Send + 'static { + // Generic spawn helper implemented on dyn Scheduler self.scheduler.spawn(future) } @@ -316,6 +393,7 @@ impl BackgroundExecutor { future: impl Future + Send + 'static ) -> Task where R: Send + 'static { + // Generic spawn_labeled helper implemented on dyn Scheduler self.scheduler.spawn_labeled(label, future) } @@ -325,7 +403,7 @@ impl BackgroundExecutor { if let Some(test_scheduler) = self.scheduler.downcast_ref::() { test_scheduler.deprioritize(label); } else { - // Production: do nothing + // Production: do nothing (ignore test-only calls) } } } @@ -338,23 +416,25 @@ pub struct ForegroundExecutor { impl ForegroundExecutor { pub fn spawn(&self, future: impl Future + 'static) -> Task where R: 'static { + // Generic spawn_foreground helper implemented on dyn Scheduler self.scheduler.spawn_foreground(future) } } -``` **Explanation:** -- GPUI executors use trait objects for production safety -- Test features accessed via downcasting to TestScheduler -- Production deployments can use minimal schedulers -- Test deployments get full test features +- GPUI executors use trait objects for production safety and object-safe `Scheduler` trait +- Generic spawn helpers provide the familiar Future-based API on `dyn Scheduler` +- Object-safe schedule methods allow trait object usage without downcasting for basic operations +- Test features still require downcasting to `TestScheduler` for deprioritization +- Production deployments can use minimal schedulers via trait objects +- Test deployments get full test features through TestScheduler wrapper ## Cloud Integration ```rust -// Cloud wrapper requires TestScheduler for session features +// Cloud wrapper requires TestScheduler for session features and task tracking pub struct CloudSimulatedScheduler { - test_scheduler: Arc, // Concrete type for test features + scheduler: Arc, // Object-safe scheduler (usually TestScheduler) inner: RefCell, } @@ -365,48 +445,110 @@ struct CloudSimulatedSchedulerInner { } impl CloudSimulatedScheduler { - pub fn spawn(&self, future: impl Future + Send + 'static) -> Task { - let task = self.test_scheduler.spawn(future); + pub fn new(scheduler: Arc) -> Self { + Self { + scheduler, + inner: RefCell::new(CloudSimulatedSchedulerInner { + current_session: None, + sessions: HashMap::new(), + task_to_session: HashMap::new(), + }), + } + } + + // Use generic spawn helpers with session tracking + pub fn spawn(&self, future: impl Future + Send + 'static) -> Task + where R: Send + 'static { + // Get task from generic spawn helper (includes task_id assignment) + let task = self.scheduler.spawn(future); // Auto-associate with current session - if let Some(session_id) = self.inner.borrow().current_session { - self.inner.borrow_mut().task_to_session.insert(task.id(), session_id); - // Track in session... + if let Some(session_id) = self.inner.borrow().current_session.clone() { + self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone()); + // Track spawned task in session + if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) { + session.spawned_tasks.push(task.metadata.id); + } + } + + task + } + + pub fn spawn_labeled( + &self, + label: TaskLabel, + future: impl Future + Send + 'static + ) -> Task + where R: Send + 'static { + // Use generic spawn_labeled helper + let task = self.scheduler.spawn_labeled(label, future); + + // Auto-associate with current session + if let Some(session_id) = self.inner.borrow().current_session.clone() { + self.inner.borrow_mut().task_to_session.insert(task.metadata.id, session_id.clone()); + // Track spawned task in session + if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) { + session.spawned_tasks.push(task.metadata.id); + } } task } pub fn validate_session_cleanup(&self, session_id: SessionId) -> Result<()> { - // Use TestScheduler's task tracking for validation - let inner = self.inner.borrow(); - - if let Some(session) = inner.sessions.get(&session_id) { - let running_tasks: Vec = session - .spawned_tasks - .iter() - .filter(|&&task_id| self.test_scheduler.is_task_running(task_id)) - .copied() - .collect(); - - // Check against explicit wait_until permissions - let unauthorized = running_tasks.difference(&session.wait_until_task_ids); - - if unauthorized.next().is_some() { - return Err(anyhow!("Session cleanup failed: unauthorized tasks still running")); + // Use TestScheduler's internal task tracking for validation + if let Some(test_scheduler) = self.scheduler.downcast_ref::() { + let inner = self.inner.borrow(); + + if let Some(session) = inner.sessions.get(&session_id) { + let running_tasks: Vec = session + .spawned_tasks + .iter() + .filter(|&&task_id| test_scheduler.is_task_running(task_id)) + .copied() + .collect(); + + // Check against explicit wait_until permissions + let unauthorized = running_tasks.difference(&session.wait_until_task_ids); + + if unauthorized.next().is_some() { + return Err(anyhow!("Session cleanup failed: unauthorized tasks still running")); + } } + } else { + // Production scheduler: no task tracking available + return Err(anyhow!("Session validation requires TestScheduler")); } Ok(()) } + + // Session management methods + pub fn create_session(&self) -> SessionId { + let session_id = SessionId::new(); + self.inner.borrow_mut().sessions.insert(session_id.clone(), SessionData { + spawned_tasks: Vec::new(), + wait_until_task_ids: HashSet::new(), + }); + self.inner.borrow_mut().current_session = Some(session_id.clone()); + session_id + } + + pub fn add_wait_until_task(&self, session_id: SessionId, task_id: TaskId) { + if let Some(session) = self.inner.borrow_mut().sessions.get_mut(&session_id) { + session.wait_until_task_ids.insert(task_id); + } + } } ``` **Explanation:** -- Cloud requires `TestScheduler` because session features need task tracking -- Auto-associates tasks with current session -- Uses TestScheduler's `is_task_running()` for validation -- Session coordination is test-focused infrastructure +- Cloud wrapper uses object-safe `Scheduler` trait with generic spawn helpers +- Internal task management: scheduler tracks running tasks, Cloud wrapper associates with sessions +- Session tracking enhanced: tasks automatically associated via spawn helpers and metadata +- Task lifecycle: scheduler manages completion internally, Cloud validates against running tasks +- Test features: downcast to TestScheduler for `is_task_running()` validation +- Production safety: uses trait objects, but session features require TestScheduler ## Migration Strategy @@ -435,13 +577,28 @@ impl CloudSimulatedScheduler { ## Benefits -✅ **Clean Separation**: Test methods only on TestScheduler -✅ **Production Safety**: GPUI executors use trait objects -✅ **Session Intelligence**: Cloud gets full coordination features -✅ **Flexible Architecture**: Production vs test deployments -✅ **Backward Compatibility**: All existing functionality preserved +✅ **Object-Safe Trait**: Scheduler trait is object-safe, enabling trait objects without downcasting for core operations +✅ **Internal Task Management**: Scheduler manages task lifecycle and completion state internally, providing unified task tracking +✅ **Clean Separation**: Test methods only on TestScheduler, generic spawn helpers on trait objects +✅ **Production Safety**: GPUI executors use trait objects with minimal dyn dispatch overhead +✅ **Session Intelligence**: Cloud gets full coordination features with automatic task-session association via spawn helpers +✅ **Flexible Architecture**: Production vs test deployments with scheduler implementations optimized for each context +✅ **Backward Compatibility**: All existing functionality preserved via generic spawn helpers on `dyn Scheduler` + +This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities through internal scheduler task management. + +### Benefits of This Approach + +✅ **Interface Compatibility**: GPUI code continues using Future-based spawn, Cloud uses session-aware wrappers +✅ **Performance**: Trait objects have minimal overhead, direct Runnable scheduling in production +✅ **Separation of Concerns**: Low-level Runnable scheduling in trait, high-level Future API as helpers +✅ **Object Safety**: Enables `Arc` usage without runtime downcasting for basic operations + +### Migration Impact -This design keeps test concerns in TestScheduler while maintaining production safety and session coordination capabilities. +- GPUI executors: Simple switch from `Arc` to `Arc` +- Cloud wrapper: Enhanced to automatically associate tasks with sessions via spawn helpers +- Test infrastructure: TestScheduler provides both low-level scheduling and task tracking internally ## Implementation Reference @@ -483,14 +640,16 @@ This design keeps test concerns in TestScheduler while maintaining production sa ### Migration Points **GPUI Areas:** -- Replace `cx.background_executor()` calls with new executors -- Update any direct `PlatformDispatcher` usage -- Preserve `spawn_labeled()` and `deprioritize()` APIs +- Update GPUI executors to use `Arc` trait objects +- Replace `PlatformDispatcher` usage with object-safe `Scheduler` methods and generic spawn helpers +- Preserve `spawn_labeled()` and `deprioritize()` APIs via generic helpers and downcasting +- Update `BackgroundExecutor` and `ForegroundExecutor` to call `dyn Scheduler` spawn helpers **Cloud Areas:** -- Replace `SimulatorRuntime` usage in tests -- Update session management to use new scheduler wrapper -- Preserve `wait_until()` and session validation behavior +- Replace `SimulatorRuntime` with `CloudSimulatedScheduler` wrapper around `dyn Scheduler` +- Implement session management using wrapper's spawn helpers with automatic task association +- Preserve `wait_until()` and session validation via downcast to TestScheduler for task tracking +- Update `ExecutionContext` implementation to use new wrapper ### Test Files Impacted @@ -521,18 +680,18 @@ This design keeps test concerns in TestScheduler while maintaining production sa ## Compatibility Checklist ### GPUI Compatibility -- ✅ `spawn()` → `scheduler.spawn()` -- ✅ `spawn_labeled(label)` → `scheduler.spawn_labeled(label)` -- ✅ `deprioritize()` → Downcast to TestScheduler -- ✅ `timer()` → `scheduler.timer()` -- ✅ `BackgroundExecutor` → Trait object wrapper +- ✅ `spawn()` → `dyn Scheduler::spawn()` (generic helper on trait object) +- ✅ `spawn_labeled(label)` → `dyn Scheduler::spawn_labeled()` (generic helper on trait object) +- ✅ `deprioritize()` → Downcast to TestScheduler, then `TestScheduler::deprioritize()` +- ✅ `timer()` → `scheduler.timer()` (platform method on trait object) +- ✅ `BackgroundExecutor` → Trait object wrapper using `dyn Scheduler` ### Cloud Compatibility -- ✅ `ExecutionContext::wait_until()` → Scheduler wrapper -- ✅ Session validation → `validate_session_cleanup()` -- ✅ Automatic session association → Wrapper intelligence -- ✅ Task cleanup checking → TestScheduler task tracking -- ✅ `spawn()` in sessions → Auto-association +- ✅ `ExecutionContext::wait_until()` → Scheduler wrapper with generic spawn helpers +- ✅ Session validation → `validate_session_cleanup()` with downcast to TestScheduler +- ✅ Automatic session association → Via spawn helpers and task metadata +- ✅ Task cleanup checking → Internal scheduler task tracking (downcast to TestScheduler for running status) +- ✅ `spawn()` in sessions → `dyn Scheduler::spawn()` with auto-association in wrapper ### Test Compatibility - ✅ Test determinism → TestScheduler deprioritization