Enforce a Send bound on next frame callbacks

Nathan Sobo and Julia Risley created

This required using mpsc channels to invoke frame callbacks on the
main thread and send the receiver to the platform display link.

Co-Authored-By: Julia Risley <julia@zed.dev>

Change summary

crates/gpui2/src/app.rs                     | 24 +++----
crates/gpui2/src/app/async_context.rs       |  7 --
crates/gpui2/src/app/test_context.rs        |  4 
crates/gpui2/src/elements/img.rs            |  2 
crates/gpui2/src/platform/mac/dispatcher.rs |  2 
crates/gpui2/src/window.rs                  | 74 +++++++++-------------
6 files changed, 44 insertions(+), 69 deletions(-)

Detailed changes

crates/gpui2/src/app.rs 🔗

@@ -39,18 +39,20 @@ use std::{
 };
 use util::http::{self, HttpClient};
 
+/// Temporary(?) wrapper around RefCell<AppContext> to help us debug any double borrows.
+/// Strongly consider removing after stabilization.
 pub struct AppCell {
     app: RefCell<AppContext>,
 }
+
 impl AppCell {
     pub fn borrow(&self) -> AppRef {
         AppRef(self.app.borrow())
     }
 
     pub fn borrow_mut(&self) -> AppRefMut {
-        let thread_id = std::thread::current().id();
-
-        eprintln!(">>> borrowing {thread_id:?}");
+        // let thread_id = std::thread::current().id();
+        // dbg!("borrowed {thread_id:?}");
         AppRefMut(self.app.borrow_mut())
     }
 }
@@ -84,7 +86,6 @@ impl App {
         let this = self.0.clone();
         let platform = self.0.borrow().platform.clone();
         platform.run(Box::new(move || {
-            dbg!("run callback");
             let cx = &mut *this.borrow_mut();
             on_finish_launching(cx);
         }));
@@ -110,14 +111,11 @@ impl App {
         F: 'static + FnMut(&mut AppContext),
     {
         let this = Rc::downgrade(&self.0);
-        self.0
-            .borrow_mut()
-            .platform
-            .on_reopen(Box::new(move || {
-                if let Some(app) = this.upgrade() {
-                    callback(&mut app.borrow_mut());
-                }
-            }));
+        self.0.borrow_mut().platform.on_reopen(Box::new(move || {
+            if let Some(app) = this.upgrade() {
+                callback(&mut app.borrow_mut());
+            }
+        }));
         self
     }
 
@@ -139,7 +137,7 @@ impl App {
 }
 
 type ActionBuilder = fn(json: Option<serde_json::Value>) -> anyhow::Result<Box<dyn Action>>;
-type FrameCallback = Box<dyn FnOnce(&mut AppContext)>;
+pub(crate) type FrameCallback = Box<dyn FnOnce(&mut AppContext)>;
 type Handler = Box<dyn FnMut(&mut AppContext) -> bool + 'static>;
 type Listener = Box<dyn FnMut(&dyn Any, &mut AppContext) -> bool + 'static>;
 type QuitHandler = Box<dyn FnOnce(&mut AppContext) -> LocalBoxFuture<'static, ()> + 'static>;

crates/gpui2/src/app/async_context.rs 🔗

@@ -28,7 +28,6 @@ impl Context for AsyncAppContext {
             .app
             .upgrade()
             .ok_or_else(|| anyhow!("app was released"))?;
-        dbg!("BUILD MODEL A");
         let mut app = app.borrow_mut();
         Ok(app.build_model(build_model))
     }
@@ -42,7 +41,6 @@ impl Context for AsyncAppContext {
             .app
             .upgrade()
             .ok_or_else(|| anyhow!("app was released"))?;
-        dbg!("UPDATE MODEL B");
         let mut app = app.borrow_mut();
         Ok(app.update_model(handle, update))
     }
@@ -52,7 +50,6 @@ impl Context for AsyncAppContext {
         F: FnOnce(AnyView, &mut WindowContext<'_>) -> T,
     {
         let app = self.app.upgrade().context("app was released")?;
-        dbg!("UPDATE WINDOW C");
         let mut lock = app.borrow_mut();
         lock.update_window(window, f)
     }
@@ -64,7 +61,6 @@ impl AsyncAppContext {
             .app
             .upgrade()
             .ok_or_else(|| anyhow!("app was released"))?;
-        dbg!("REFRESH");
         let mut lock = app.borrow_mut();
         lock.refresh();
         Ok(())
@@ -125,7 +121,6 @@ impl AsyncAppContext {
             .app
             .upgrade()
             .ok_or_else(|| anyhow!("app was released"))?;
-        dbg!("read global");
         let app = app.borrow_mut();
         Ok(read(app.global(), &app))
     }
@@ -135,7 +130,6 @@ impl AsyncAppContext {
         read: impl FnOnce(&G, &AppContext) -> R,
     ) -> Option<R> {
         let app = self.app.upgrade()?;
-        dbg!("try read global");
         let app = app.borrow_mut();
         Some(read(app.try_global()?, &app))
     }
@@ -148,7 +142,6 @@ impl AsyncAppContext {
             .app
             .upgrade()
             .ok_or_else(|| anyhow!("app was released"))?;
-        dbg!("update global");
         let mut app = app.borrow_mut();
         Ok(app.update_global(update))
     }

crates/gpui2/src/app/test_context.rs 🔗

@@ -1,7 +1,7 @@
 use crate::{
-    AnyView, AnyWindowHandle, AppContext, AsyncAppContext, BackgroundExecutor, Context,
+    AnyView, AnyWindowHandle, AppCell, AppContext, AsyncAppContext, BackgroundExecutor, Context,
     EventEmitter, ForegroundExecutor, Model, ModelContext, Result, Task, TestDispatcher,
-    TestPlatform, WindowContext, AppCell,
+    TestPlatform, WindowContext,
 };
 use anyhow::{anyhow, bail};
 use futures::{Stream, StreamExt};

crates/gpui2/src/elements/img.rs 🔗

@@ -125,9 +125,7 @@ where
             } else {
                 cx.spawn(|_, mut cx| async move {
                     if image_future.await.log_err().is_some() {
-                        eprintln!(">>> on_next_frame");
                         cx.on_next_frame(|cx| cx.notify());
-                        eprintln!("<<< on_next_frame")
                     }
                 })
                 .detach()

crates/gpui2/src/platform/mac/dispatcher.rs 🔗

@@ -42,7 +42,6 @@ impl PlatformDispatcher for MacDispatcher {
     }
 
     fn dispatch(&self, runnable: Runnable) {
-        println!("DISPATCH");
         unsafe {
             dispatch_async_f(
                 dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT.try_into().unwrap(), 0),
@@ -53,7 +52,6 @@ impl PlatformDispatcher for MacDispatcher {
     }
 
     fn dispatch_on_main_thread(&self, runnable: Runnable) {
-        println!("DISPATCH ON MAIN THREAD");
         unsafe {
             dispatch_async_f(
                 dispatch_get_main_queue(),

crates/gpui2/src/window.rs 🔗

@@ -410,67 +410,55 @@ impl<'a> WindowContext<'a> {
     }
 
     /// Schedule the given closure to be run directly after the current frame is rendered.
-    pub fn on_next_frame(&mut self, f: impl FnOnce(&mut WindowContext) + 'static) {
-        let f = Box::new(f);
+    pub fn on_next_frame(&mut self, callback: impl FnOnce(&mut WindowContext) + 'static) {
+        let handle = self.window.handle;
         let display_id = self.window.display_id;
 
-        self.next_frame_callbacks
-            .entry(display_id)
-            .or_default()
-            .push(f);
-
-        self.frame_consumers.entry(display_id).or_insert_with(|| {
-            let (tx, rx) = mpsc::unbounded::<()>();
+        if !self.frame_consumers.contains_key(&display_id) {
+            let (tx, mut rx) = mpsc::unbounded::<()>();
+            self.platform.set_display_link_output_callback(
+                display_id,
+                Box::new(move |_current_time, _output_time| _ = tx.unbounded_send(())),
+            );
 
-            self.spawn(|cx| async move {
+            let consumer_task = self.app.spawn(|cx| async move {
                 while rx.next().await.is_some() {
-                    let _ = cx.update(|_, cx| {
+                    cx.update(|cx| {
                         for callback in cx
-                            .app
                             .next_frame_callbacks
                             .get_mut(&display_id)
                             .unwrap()
                             .drain(..)
+                            .collect::<SmallVec<[_; 32]>>()
                         {
                             callback(cx);
                         }
-                    });
-                }
-            })
-        });
+                    })
+                    .ok();
 
-        if let Some(callbacks) = self.next_frame_callbacks.get_mut(&display_id) {
-            callbacks.push(f);
-            // If there was already a callback, it means that we already scheduled a frame.
-            if callbacks.len() > 1 {
-                return;
-            }
-        } else {
-            let mut async_cx = self.to_async();
-            self.next_frame_callbacks.insert(display_id, vec![f]);
-            self.platform.set_display_link_output_callback(
-                display_id,
-                Box::new(move |_current_time, _output_time| {
-                    let _ = async_cx.update(|_, cx| {
-                        let callbacks = cx
-                            .next_frame_callbacks
-                            .get_mut(&display_id)
-                            .unwrap()
-                            .drain(..)
-                            .collect::<Vec<_>>();
-                        for callback in callbacks {
-                            callback(cx);
-                        }
+                    // Flush effects, then stop the display link if no new next_frame_callbacks have been added.
 
-                        if cx.next_frame_callbacks.get(&display_id).unwrap().is_empty() {
+                    cx.update(|cx| {
+                        if cx.next_frame_callbacks.is_empty() {
                             cx.platform.stop_display_link(display_id);
                         }
-                    });
-                }),
-            );
+                    })
+                    .ok();
+                }
+            });
+            self.frame_consumers.insert(display_id, consumer_task);
+        }
+
+        if self.next_frame_callbacks.is_empty() {
+            self.platform.start_display_link(display_id);
         }
 
-        self.platform.start_display_link(display_id);
+        self.next_frame_callbacks
+            .entry(display_id)
+            .or_default()
+            .push(Box::new(move |cx: &mut AppContext| {
+                cx.update_window(handle, |_root_view, cx| callback(cx)).ok();
+            }));
     }
 
     /// Spawn the future returned by the given closure on the application thread pool.