Avoid stalling server when test notifications aren't being processed

Antonio Scandurra created

Change summary

crates/project/src/worktree.rs |  4 ++++
crates/server/src/rpc.rs       | 27 ++++++++++++++++-----------
2 files changed, 20 insertions(+), 11 deletions(-)

Detailed changes

crates/project/src/worktree.rs 🔗

@@ -869,6 +869,10 @@ impl RemoteWorktree {
         Ok(())
     }
 
+    pub fn has_pending_updates(&self) -> bool {
+        !self.pending_updates.is_empty()
+    }
+
     pub fn update_diagnostic_summary(
         &mut self,
         path: Arc<Path>,

crates/server/src/rpc.rs 🔗

@@ -9,9 +9,8 @@ use anyhow::anyhow;
 use async_std::task;
 use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
 use collections::{HashMap, HashSet};
-use futures::{future::BoxFuture, FutureExt, StreamExt};
+use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
 use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
-use postage::{mpsc, prelude::Sink as _};
 use rpc::{
     proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
     Connection, ConnectionId, Peer, TypedEnvelope,
@@ -38,7 +37,7 @@ pub struct Server {
     store: RwLock<Store>,
     app_state: Arc<AppState>,
     handlers: HashMap<TypeId, MessageHandler>,
-    notifications: Option<mpsc::Sender<()>>,
+    notifications: Option<mpsc::UnboundedSender<()>>,
 }
 
 pub trait Executor {
@@ -54,7 +53,7 @@ impl Server {
     pub fn new(
         app_state: Arc<AppState>,
         peer: Arc<Peer>,
-        notifications: Option<mpsc::Sender<()>>,
+        notifications: Option<mpsc::UnboundedSender<()>>,
     ) -> Arc<Self> {
         let mut server = Self {
             peer,
@@ -155,7 +154,7 @@ impl Server {
         connection: Connection,
         addr: String,
         user_id: UserId,
-        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
+        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
         executor: E,
     ) -> impl Future<Output = ()> {
         let mut this = self.clone();
@@ -1095,7 +1094,7 @@ mod tests {
     use collections::BTreeMap;
     use gpui::{executor, ModelHandle, TestAppContext};
     use parking_lot::Mutex;
-    use postage::{mpsc, watch};
+    use postage::{sink::Sink, watch};
     use rand::prelude::*;
     use rpc::PeerId;
     use serde_json::json;
@@ -3769,8 +3768,14 @@ mod tests {
                         project
                             .worktrees(cx)
                             .map(|worktree| {
-                                let snapshot = worktree.read(cx).snapshot();
-                                (snapshot.id(), snapshot)
+                                let worktree = worktree.read(cx);
+                                assert!(
+                                    !worktree.as_remote().unwrap().has_pending_updates(),
+                                    "Guest {} worktree {:?} contains deferred updates",
+                                    guest_id,
+                                    worktree.id()
+                                );
+                                (worktree.id(), worktree.snapshot())
                             })
                             .collect::<BTreeMap<_, _>>()
                     });
@@ -3837,7 +3842,7 @@ mod tests {
         app_state: Arc<AppState>,
         server: Arc<Server>,
         foreground: Rc<executor::Foreground>,
-        notifications: mpsc::Receiver<()>,
+        notifications: mpsc::UnboundedReceiver<()>,
         connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
         forbid_connections: Arc<AtomicBool>,
         _test_db: TestDb,
@@ -3849,7 +3854,7 @@ mod tests {
             test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
             let app_state = Self::build_app_state(&test_db).await;
             let peer = Peer::new();
-            let notifications = mpsc::channel(128);
+            let notifications = mpsc::unbounded();
             let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
             Self {
                 peer,
@@ -3871,7 +3876,7 @@ mod tests {
             let server = self.server.clone();
             let connection_killers = self.connection_killers.clone();
             let forbid_connections = self.forbid_connections.clone();
-            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
+            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
 
             Arc::get_mut(&mut client)
                 .unwrap()