From b354af7bdac44cce7b972027dc89a4a1a69a9735 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 6 Apr 2022 22:02:04 -0700 Subject: [PATCH] Use an unbounded channel in gpui test helper methods The bounded channel could fill up when many events were emitted in one effect cycle. --- crates/gpui/src/app.rs | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 6eb5a8f6165cd2e5053701474018c7365e18072a..dcdd325eae78b9f8e96baa68e4d7f1c3904dac12 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -3392,12 +3392,10 @@ impl ModelHandle { #[cfg(any(test, feature = "test-support"))] pub fn next_notification(&self, cx: &TestAppContext) -> impl Future { - use postage::prelude::{Sink as _, Stream as _}; - - let (mut tx, mut rx) = postage::mpsc::channel(1); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscription = cx.observe(self, move |_, _| { - tx.try_send(()).ok(); + tx.unbounded_send(()).ok(); }); let duration = if std::env::var("CI").is_ok() { @@ -3407,7 +3405,7 @@ impl ModelHandle { }; async move { - let notification = crate::util::timeout(duration, rx.recv()) + let notification = crate::util::timeout(duration, rx.next()) .await .expect("next notification timed out"); drop(subscription); @@ -3420,12 +3418,10 @@ impl ModelHandle { where T::Event: Clone, { - use postage::prelude::{Sink as _, Stream as _}; - - let (mut tx, mut rx) = postage::mpsc::channel(1); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscription = cx.subscribe(self, move |_, event, _| { - tx.blocking_send(event.clone()).ok(); + tx.unbounded_send(event.clone()).ok(); }); let duration = if std::env::var("CI").is_ok() { @@ -3434,8 +3430,9 @@ impl ModelHandle { Duration::from_secs(1) }; + cx.foreground.start_waiting(); async move { - let event = crate::util::timeout(duration, rx.recv()) + let event = crate::util::timeout(duration, rx.next()) .await .expect("next event timed out"); drop(subscription); @@ -3449,22 +3446,20 @@ impl ModelHandle { cx: &TestAppContext, mut predicate: impl FnMut(&T, &AppContext) -> bool, ) -> impl Future { - use postage::prelude::{Sink as _, Stream as _}; - - let (tx, mut rx) = postage::mpsc::channel(1024); + let (tx, mut rx) = futures::channel::mpsc::unbounded(); let mut cx = cx.cx.borrow_mut(); let subscriptions = ( cx.observe(self, { - let mut tx = tx.clone(); + let tx = tx.clone(); move |_, _| { - tx.blocking_send(()).ok(); + tx.unbounded_send(()).ok(); } }), cx.subscribe(self, { - let mut tx = tx.clone(); + let tx = tx.clone(); move |_, _, _| { - tx.blocking_send(()).ok(); + tx.unbounded_send(()).ok(); } }), ); @@ -3495,7 +3490,7 @@ impl ModelHandle { } cx.borrow().foreground().start_waiting(); - rx.recv() + rx.next() .await .expect("model dropped with pending condition"); cx.borrow().foreground().finish_waiting();