WIP - flush_fs_events

Max Brunsfeld created

Change summary

crates/gpui2/src/app/model_context.rs |  6 ++--
crates/gpui2/src/app/test_context.rs  | 41 +++++++++++++++++++++++++++-
crates/project2/src/worktree.rs       | 16 ++++++----
3 files changed, 51 insertions(+), 12 deletions(-)

Detailed changes

crates/gpui2/src/app/model_context.rs 🔗

@@ -40,15 +40,15 @@ impl<'a, T: 'static> ModelContext<'a, T> {
         self.model_state.clone()
     }
 
-    pub fn observe<T2, E>(
+    pub fn observe<W, E>(
         &mut self,
         entity: &E,
         mut on_notify: impl FnMut(&mut T, E, &mut ModelContext<'_, T>) + 'static,
     ) -> Subscription
     where
         T: 'static,
-        T2: 'static,
-        E: Entity<T2>,
+        W: 'static,
+        E: Entity<W>,
     {
         let this = self.weak_model();
         let entity_id = entity.entity_id();

crates/gpui2/src/app/test_context.rs 🔗

@@ -3,8 +3,9 @@ use crate::{
     ForegroundExecutor, Model, ModelContext, Result, Task, TestDispatcher, TestPlatform,
     WindowContext,
 };
-use futures::SinkExt;
-use std::{cell::RefCell, future::Future, rc::Rc, sync::Arc};
+use anyhow::anyhow;
+use futures::{SinkExt, StreamExt};
+use std::{cell::RefCell, future::Future, rc::Rc, sync::Arc, time::Duration};
 
 #[derive(Clone)]
 pub struct TestAppContext {
@@ -158,4 +159,40 @@ impl TestAppContext {
             .detach();
         rx
     }
+
+    pub async fn condition<T: EventEmitter + 'static>(
+        &mut self,
+        model: &Model<T>,
+        mut predicate: impl FnMut(&mut T, &mut ModelContext<T>) -> bool,
+    ) {
+        let (mut tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
+        let timer = self.executor().timer(Duration::from_secs(3));
+
+        let subscriptions = model.update(self, move |_, cx| {
+            (
+                cx.observe(model, move |_, _, _| {
+                    // let _ = tx.send(());
+                }),
+                cx.subscribe(model, move |_, _, _, _| {
+                    let _ = tx.send(());
+                }),
+            )
+        });
+
+        use futures::FutureExt as _;
+        use smol::future::FutureExt as _;
+
+        async {
+            while rx.next().await.is_some() {
+                if model.update(self, &mut predicate) {
+                    return Ok(());
+                }
+            }
+            drop(subscriptions);
+            unreachable!()
+        }
+        .race(timer.map(|_| Err(anyhow!("condition timed out"))))
+        .await
+        .unwrap();
+    }
 }

crates/project2/src/worktree.rs 🔗

@@ -17,7 +17,7 @@ use futures::{
     },
     select_biased,
     task::Poll,
-    FutureExt, Stream, StreamExt,
+    FutureExt as _, Stream, StreamExt,
 };
 use fuzzy2::CharBag;
 use git::{DOT_GIT, GITIGNORE};
@@ -4053,7 +4053,8 @@ impl WorktreeModelHandle for Model<Worktree> {
         &self,
         cx: &'a mut gpui2::TestAppContext,
     ) -> futures::future::LocalBoxFuture<'a, ()> {
-        let filename = "fs-event-sentinel";
+        let file_name = "fs-event-sentinel";
+
         let tree = self.clone();
         let (fs, root_path) = self.update(cx, |tree, _| {
             let tree = tree.as_local().unwrap();
@@ -4061,16 +4062,17 @@ impl WorktreeModelHandle for Model<Worktree> {
         });
 
         async move {
-            fs.create_file(&root_path.join(filename), Default::default())
+            fs.create_file(&root_path.join(file_name), Default::default())
                 .await
                 .unwrap();
+            cx.condition(&tree, |tree, _| tree.entry_for_path(file_name).is_some())
+                .await;
 
-            assert!(tree.update(cx, |tree, _| tree.entry_for_path(filename).is_some()));
-
-            fs.remove_file(&root_path.join(filename), Default::default())
+            fs.remove_file(&root_path.join(file_name), Default::default())
                 .await
                 .unwrap();
-            assert!(tree.update(cx, |tree, _| tree.entry_for_path(filename).is_none()));
+            cx.condition(&tree, |tree, _| tree.entry_for_path(file_name).is_none())
+                .await;
 
             cx.update(|cx| tree.read(cx).as_local().unwrap().scan_complete())
                 .await;