Use an unbounded channel in gpui test helper methods

Max Brunsfeld created

The bounded channel could fill up when many events were emitted in one
effect cycle.

Change summary

crates/gpui/src/app.rs | 31 +++++++++++++------------------
1 file changed, 13 insertions(+), 18 deletions(-)

Detailed changes

crates/gpui/src/app.rs 🔗

@@ -3392,12 +3392,10 @@ impl<T: Entity> ModelHandle<T> {
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn next_notification(&self, cx: &TestAppContext) -> impl Future<Output = ()> {
-        use postage::prelude::{Sink as _, Stream as _};
-
-        let (mut tx, mut rx) = postage::mpsc::channel(1);
+        let (tx, mut rx) = futures::channel::mpsc::unbounded();
         let mut cx = cx.cx.borrow_mut();
         let subscription = cx.observe(self, move |_, _| {
-            tx.try_send(()).ok();
+            tx.unbounded_send(()).ok();
         });
 
         let duration = if std::env::var("CI").is_ok() {
@@ -3407,7 +3405,7 @@ impl<T: Entity> ModelHandle<T> {
         };
 
         async move {
-            let notification = crate::util::timeout(duration, rx.recv())
+            let notification = crate::util::timeout(duration, rx.next())
                 .await
                 .expect("next notification timed out");
             drop(subscription);
@@ -3420,12 +3418,10 @@ impl<T: Entity> ModelHandle<T> {
     where
         T::Event: Clone,
     {
-        use postage::prelude::{Sink as _, Stream as _};
-
-        let (mut tx, mut rx) = postage::mpsc::channel(1);
+        let (tx, mut rx) = futures::channel::mpsc::unbounded();
         let mut cx = cx.cx.borrow_mut();
         let subscription = cx.subscribe(self, move |_, event, _| {
-            tx.blocking_send(event.clone()).ok();
+            tx.unbounded_send(event.clone()).ok();
         });
 
         let duration = if std::env::var("CI").is_ok() {
@@ -3434,8 +3430,9 @@ impl<T: Entity> ModelHandle<T> {
             Duration::from_secs(1)
         };
 
+        cx.foreground.start_waiting();
         async move {
-            let event = crate::util::timeout(duration, rx.recv())
+            let event = crate::util::timeout(duration, rx.next())
                 .await
                 .expect("next event timed out");
             drop(subscription);
@@ -3449,22 +3446,20 @@ impl<T: Entity> ModelHandle<T> {
         cx: &TestAppContext,
         mut predicate: impl FnMut(&T, &AppContext) -> bool,
     ) -> impl Future<Output = ()> {
-        use postage::prelude::{Sink as _, Stream as _};
-
-        let (tx, mut rx) = postage::mpsc::channel(1024);
+        let (tx, mut rx) = futures::channel::mpsc::unbounded();
 
         let mut cx = cx.cx.borrow_mut();
         let subscriptions = (
             cx.observe(self, {
-                let mut tx = tx.clone();
+                let tx = tx.clone();
                 move |_, _| {
-                    tx.blocking_send(()).ok();
+                    tx.unbounded_send(()).ok();
                 }
             }),
             cx.subscribe(self, {
-                let mut tx = tx.clone();
+                let tx = tx.clone();
                 move |_, _, _| {
-                    tx.blocking_send(()).ok();
+                    tx.unbounded_send(()).ok();
                 }
             }),
         );
@@ -3495,7 +3490,7 @@ impl<T: Entity> ModelHandle<T> {
                     }
 
                     cx.borrow().foreground().start_waiting();
-                    rx.recv()
+                    rx.next()
                         .await
                         .expect("model dropped with pending condition");
                     cx.borrow().foreground().finish_waiting();